The aim of this project is to showcase skills in data wrangling. Data wrangling is the process of extracting, transforming, cleaning, and mapping data from a raw data form into another format with the intent of making it more appropriate and valuable for a variety of downstream purposes such as analytics.
OpenStreetMap is an open source database of map data built by a community of mappers who contribute and maintain data about roads, places, public transit, and much more, all over the world. OpenStreetMap emphasizes local knowledge, and contributors input data from their communities. Much of the data is entered manually, meaning it is expected to be dirty (erroneous).
The data munging process begins with extracting the data in a raw form from the data source, cleaning the raw data, and manipulating the data using algorithms or parsing the data into predefined data structures, and finally depositing the resulting content into a data sink for storage and future use.
The data transformations are applied to distinct entities (e.g. fields, rows, columns, data values etc.) within a data set, and could include such actions as extractions, parsing, joining, standardizing, augmenting, cleansing, consolidating, and filtering to create desired wrangling outputs that can be leveraged downstream.
The area chosen is the Upper West Side neighborhood in Manhattan, New York City where the author resides and has local knowledge. The data was surprisingly clean and my mentor reviewing this project should note that the sample database provided was intentionally manually corrupted to test the code submitted.
Specifically, the area is from West 59th Street, Columbus Circle up to West 101st Street (North - South direction) and from the West Side Highway, NY Route 9A to Central Park West (East - West direction).
The data file, UpperWestSideFull.osm is 78.2MB. A smaller, test data file, UpperWestSideTest.osm is 10.1MB.
The Topologically Integrated Geographic Encoding and Referencing system (TIGER) data, produced by the US Census Bureau, is a public domain data source which has many geographic features. The TIGER/Line files are extracts of selected geographic information, including roads, boundaries, and hydrography features. All of the roads were imported into OpenStreetMap in 2007 and 2008, populating the nearly empty map of the United States.
The data wrangling in this project focuses on data quality. In particular, data quality is determined by the following measures:
A key dimension of the process is to clean any bad data:
Correcting the data involves removing typographical errors, enhancing data, validating data against known entities, changing or mapping data, and standardizing or formatting data.
It should be noted, that the data wrangling employed is customized to the dataset and the results required for a particular project. Each situation will be unique based on the needs of the output.
The map data for the given region is obtained from OpenStreetMap.org in an XML format file. Python programming is used to read in the data, perform the munging, and write out the data to CSV files. Then, the CSV files are imported into a SQLite database where the data analysis is performed.

Relevant data imported from the raw XML file includes:
Data quality checks and corrections are applied to the following data:
(listed with the Open Street Map tag keys)
| Attribute | Tag Key |
|---|---|
| Address house number | key = addr:housenumber |
| Address street | key = addr:street |
| Address city | key = addr:city |
| Address state | key = addr:state |
| Address postal code | key = addr:postcode |
| Telephone number | key = phone |
| Email address | key = email |
| Website URL | key = website or key = url |
| TIGER info | key = tiger |
| Name of the establishment | key = name |
| Type of store | key = shop |
| Type of building | key = building |
| Type of amenity | key = amenity |
| Type of cuisine | key = cuisine |
| Inscription text | key = inscription |
The dataset is contained in an XML file. The main function iteratively reads each XML element from the file, processes the element, and writes the parsed data to a CSV file for later import to a database. The central process builds a Python dictionary for the current element in the iteration, and cleans the data. The dictionary is constructed conforming to a defined schema, and each element is validated against the schema before writing it to the CSV file.
An SQL database is created according to the schema, and the data from the CSV files are imported. The data is analyzed issuing SQL commands to the database.
Run a first pass scan over the dataset:
Use reverse logic: The correct data is known. Identify any data that is rejected on correctness. Accuracy is determined by matching against known correct data and implementing data rules using logic and regular expressions.
Python code:
initial_scan.py
Read in the data. Determine the size of the dataset. Identify problems and the magnitude of the problematic data. The methodology employs regular expressions, lookup lists, and counts.
The idea is to get a handle on how dirty the data is. Perform a count of all the different tags and then compare the problem data to the total number of tags.
# Filename: initial_scan.py
# Python 3.7
# Initial data scan to approximately size the problem data
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
from email.utils import parseaddr
from urllib.parse import urlparse
import pprint
import operator
pp = pprint.PrettyPrinter(indent=4, width=20)
map_file = 'UpperWestSideFull.osm' # File size is 78.2MB
streets_issue = defaultdict(int)
us_states_issue = defaultdict(int)
cities_issue = defaultdict(int)
phones_issue = defaultdict(int)
emails_issue = defaultdict(int)
websites_issue = defaultdict(int)
zipcodes_issue = defaultdict(int)
zips_outside = defaultdict(int)
tiger_issue = defaultdict(int)
counts = defaultdict(int)
# Known correct data lists
ok_streets = [ 'Americas', 'Avenue', 'Boulevard', 'Broadway', 'Circle', 'Court', 'Drive', 'East', 'Lane',
'North', 'Parkway', 'Place', 'Plaza', 'Road', 'South', 'Square', 'Street', 'Terrace', 'Walk',
'Way', 'West']
ok_city = ['New York', 'New York City']
ok_domains = ['.com', '.org', '.net', '.edu', '.gov', '.us', '.nyc', '.biz', '.info', '.io', '.it',
'.co', '.site', '.cz', '.hu', '.int']
uws_zips = ['10023', '10024', '10025'] # Upper West Side zip codes
# Regular expressions
# r prefix means raw string -- send RE module backslash -- RE module must process backslash as escape
phone_re = re.compile(r"^(\+?1?\s?-?\(?\)?)(\d{3})\D*(\d{3})\D*(\d{4})$") # r: RE must process backslash as escape
email_re = re.compile(r"[\w.-]+@[\w.-]+")
website_re = re.compile(r"^(http\:\/\/)?(https\:\/\/)?([\w.-])*[\w-]+\.([a-zA-Z][a-zA-Z][a-zA-Z]?[a-zA-Z]?)(\/[\w/.#?=%&,!+()-]*)?$")
street_re = re.compile(r"\b\S+\.?$", re.IGNORECASE)
def count_issues_states(name):
"""If state is not NY, record the problem and return None.
Arguments:
name -- the value of the addr:state key
"""
if not name or name != 'NY':
us_states_issue[name] += 1
return
def count_issues_cities(name):
"""If city is not in list, record the problem and return None.
Arguments:
name -- the value of the addr:city key
"""
if not name or name not in ok_city:
cities_issue[name] += 1
return
def count_issues_phones(name):
"""If phone number is a dud or does not fit pattern, record the problem and return None.
Arguments:
name -- the value of the phone key
"""
if not name or name.isspace():
phones_issue[name] += 1
return
match = phone_re.search(name)
if not match:
phones_issue[name] += 1
return
def count_issues_emails(name):
"""If email is a dud or does not parse, record the problem and return None.
Arguments:
name -- the value of the email key
"""
if not name or name.isspace():
emails_issue[name] += 1
return
name = name.lower()
first_parse = parseaddr(name)
second_parse = not email_re.search(name)
third_parse = not first_parse[1].endswith(tuple(ok_domains))
if first_parse == ('', '') or second_parse or third_parse:
emails_issue[name] += 1
return
def count_issues_websites(name):
"""If website is a dud or does not parse, record the problem and return None.
Arguments:
name -- the value of the website or url key
"""
if not name or name.isspace():
websites_issue[name] += 1
return
name = name.strip().lower()
flag_1 = True
first_parse = urlparse(name)
if (first_parse.scheme == '') and (first_parse.netloc == '') and (first_parse.path == ''):
flag_1 = False
match = website_re.search(name)
flag_2 = False
for domain in ok_domains:
if domain in name:
flag_2 = True
break
else:
flag_2 = False
if not (match and flag_1 and flag_2):
websites_issue[name] += 1
return
def count_issues_zipcodes(name):
"""If zip code is a dud or does not parse, record the problem and return None.
Arguments:
name -- the value of the addr:postcode key
"""
if not name or name.isspace():
zipcodes_issue[name] += 1
return
if (not name.isdigit()) or len(name) != 5:
zipcodes_issue[name] += 1
return
if name not in uws_zips:
zips_outside[name] += 1
return
def count_issues_tiger(name):
"""If TIGER is a dud or not in list, record the problem and return None.
Arguments:
name -- the value of the tiger:reviewed key
"""
if not name or name.isspace():
tiger_issue[name] += 1
return
if name not in ['yes', 'no']:
tiger_issue[name] += 1
return
def count_issues_streets(name):
"""If street is a dud or does not parse, record the problem and return None.
Arguments:
name -- the value of the addr:street key
"""
if not name or name.isspace():
streets_issue[name] += 1
return
m = street_re.search(name)
if m:
street_type = m.group() # returns a String
if street_type not in ok_streets:
streets_issue[street_type] += 1
return
def is_street(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")
def is_state(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:state")
def is_city(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:city")
def is_phone(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "phone")
def is_email(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "email")
def is_website(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "website" or (elem.attrib["k"] == "url"))
def is_zipcode(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:postcode")
def is_tiger(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "tiger:reviewed")
def initialize_dicts():
"""Flush the toilet and return a boolean."""
try:
streets_issue.clear()
us_states_issue.clear()
cities_issue.clear()
phones_issue.clear()
emails_issue.clear()
websites_issue.clear()
zipcodes_issue.clear()
zips_outside.clear()
tiger_issue.clear()
counts.clear()
except:
return None
return True
#----------------------#
# Main routine #
#----------------------#
def initial_count_problems():
"""Count the dataset and the problem data, print a report, and return None."""
response = initialize_dicts()
if not response:
print ('Fatal Error initializing dictionaries')
print ('\nTerminating execution...')
return None
osm_file = open(map_file, "r")
for event, elem in ET.iterparse(osm_file):
if event == "end":
if is_street(elem):
counts['total_street'] += 1
count_issues_streets(elem.attrib['v'])
if is_city(elem):
counts['total_city'] += 1
count_issues_cities(elem.attrib["v"])
if is_state(elem):
counts['total_state'] += 1
count_issues_states(elem.attrib["v"])
if is_zipcode(elem):
counts['total_zipcode'] += 1
count_issues_zipcodes(elem.attrib["v"])
if is_phone(elem):
counts['total_phone'] += 1
count_issues_phones(elem.attrib["v"])
if is_email(elem):
counts['total_email'] += 1
count_issues_emails(elem.attrib["v"])
if is_website(elem):
counts['total_website'] += 1
count_issues_websites(elem.attrib["v"])
if is_tiger(elem):
counts['total_tiger'] += 1
count_issues_tiger(elem.attrib["v"])
counts['record_count'] += 1
elem.clear()
osm_file.close()
print_initial_scan()
return
def print_initial_scan():
"""Print a report and return None."""
streets_issue_sort = sorted(streets_issue.items(), key=operator.itemgetter(1))
streets_issue_sort.reverse() # Note: In-place reversal
cities_issue_sort = sorted(cities_issue.items(), key=operator.itemgetter(1))
cities_issue_sort.reverse() # Note: In-place reversal
us_states_issue_sort = sorted(us_states_issue.items(), key=operator.itemgetter(1))
us_states_issue_sort.reverse() # Note: In-place reversal
zipcodes_issue_sort = sorted(zipcodes_issue.items(), key=operator.itemgetter(1))
zipcodes_issue_sort.reverse() # Note: In-place reversal
zips_outside_sort = sorted(zips_outside.items(), key=operator.itemgetter(1))
zips_outside_sort.reverse() # Note: In-place reversal
phones_issue_sort = sorted(phones_issue.items(), key=operator.itemgetter(1))
phones_issue_sort.reverse() # Note: In-place reversal
emails_issue_sort = sorted(emails_issue.items(), key=operator.itemgetter(1))
emails_issue_sort.reverse() # Note: In-place reversal
websites_issue_sort = sorted(websites_issue.items(), key=operator.itemgetter(1))
websites_issue_sort.reverse() # Note: In-place reversal
sum_val = sum(streets_issue.values())
print ('Number of street issues: {:,}'.format(sum_val))
print ('Total number of streets: {:,}'.format(counts['total_street']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_street']))
print ("Street problems: ")
print ( *streets_issue_sort, sep = "\n" )
sum_val = sum(cities_issue.values())
print ('\nNumber of city issues: {:,}'.format(sum_val))
print ('Total number of cities: {:,}'.format(counts['total_city']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_city']))
print ("City problems: ")
print ( *cities_issue_sort, sep = "\n" )
sum_val = sum(us_states_issue.values())
print ('\nNumber of state issues: {:,}'.format(sum_val))
print ('Total number of states: {:,}'.format(counts['total_state']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_state']))
print ("State problems: ")
print ( *us_states_issue_sort, sep = "\n" )
sum_val = sum(zipcodes_issue.values())
print ('\nNumber of zipcode issues: {:,}'.format(sum_val))
print ('Total number of zipcodes: {:,}'.format(counts['total_zipcode']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_zipcode']))
print ("Zip code problems: ")
print ( *zipcodes_issue_sort, sep = "\n" )
sum_val = sum(zips_outside.values())
print ('\nNumber of zipcodes outside Upper West Side: {:,}'.format(sum_val))
print ('Total number of zipcodes: {:,}'.format(counts['total_zipcode']), \
' Percent outside UWS: {:.1%}'.format(float(sum_val)/counts['total_zipcode']))
print ("Zip codes outside UWS: ")
print ( *zips_outside_sort, sep = "\n" )
sum_val = sum(phones_issue.values())
print ('\nNumber of phone number issues: {:,}'.format(sum_val))
print ('Total number of phone numbers: {:,}'.format(counts['total_phone']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_phone']))
print ("Phone problems: ")
print ( *phones_issue_sort, sep = "\n" )
sum_val = sum(emails_issue.values())
print ('\nNumber of email address issues: {:,}'.format(sum_val))
print ('Total number of email addresses: {:,}'.format(counts['total_email']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_email']))
print ("email problems: ")
print ( *emails_issue_sort, sep = "\n" )
sum_val = sum(websites_issue.values())
print ('\nNumber of website URL issues: {:,}'.format(sum_val))
print ('Total number of websites: {:,}'.format(counts['total_website']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_website']))
print ("Website problems: ")
print ( *websites_issue_sort, sep = "\n" )
sum_val = sum(tiger_issue.values())
print ('\nNumber of TIGER issues: {:,}'.format(sum_val))
print ('Total number of TIGER: {:,}'.format(counts['total_tiger']), \
' Percent problems: {:.1%}'.format(float(sum_val)/counts['total_tiger']))
print ("TIGER problems: ")
pp.pprint ( dict(tiger_issue) )
print ( "\nTotal record count: {:,}".format(counts['record_count']) )
return
if __name__ == '__main__':
initial_count_problems()
The data are surprisingly clean! A surprise because the data are largely entered by hand. I suspect the data were previously cleaned.
Note: The full dataset is used above. The reviewer should note that I have manually corrupted the test dataset contained herein to illustrate the functionality of the code.
Although the magnitude of the problem data is small there are opportunities to remedy several issues.
Corrections:
Data quality checks for:
During processing, each element is inspected by examining the key and the value of the element tag. The key identifies the type of element currently in process, and the value is directed to the appropriate function for data correction.
The Open Street Map XML file contains several tags. This study utilizes the <node> and <way> tags in the XML data source. Inside these tags are children <tag> tags that are evaluated.
Data not meeting the quality checks are removed from the dataset before writing the record to the CSV file.
The strategy employed is to correct the data first, if possible. Then eliminate the bad data that is not fixed.
Data validation of the element ID, tag key, and <way node> reference is performed in the build_dictionary_element_tree function before the tag is sent to the data correction function. A further description is provided in the discussion of the dictionary build (see below).
The following data fields are examined and cleaned:
Data quality checks for validity, consistency and uniformity
Example:
<node> or <way>
<tag k="addr:street" v="West 80th Street"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Separate the name of the street from the street description and examine each
Lookup list, mapping dictionary, and Python code (dictionaries)
Data quality checks for accuracy, consistency and uniformity
Example:
<node> or <way>
<tag k="addr:city" v="New York"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Lookup list, mapping dictionary, and Python code (dictionaries)
Data quality checks for accuracy, consistency and uniformity
Example:
<node> or <way>
<tag k="addr:state" v="NY"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Lookup list, Python code (dictionaries)
Data quality checks for validity, consistency and uniformity
Example:
<node> or <way>
<tag k="addr:postcode" v="10025"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Lookup list, Python code (dictionaries)
Data quality checks for validity, consistency and uniformity
Example:
<node> or <way>
<tag k="phone" v="+1-646-630-9652"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Lookup list, regular expression, Python code (dictionaries)
Data quality checks for validity, consistency and uniformity
Example:
<node> or <way>
<tag k="email" v="uws@dannyscycles.com"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Regular expression, lookup list, Python code (dictionaries)
Data quality checks for validity, consistency and uniformity
Example:
<node> or <way>
<tag k="website" or k="url" v="http://elizabethsnyc.com"/>
</node> or </way>
Checks:
Corrections:
Methodology:
Regular expression, lookup list, Python code (dictionaries)
Data quality checks for consistency and uniformity
Example:
<way>
<tag k="tiger:reviewed" v="no"/>
</way>
Checks:
Corrections:
Methodology:
Lookup list, Python code (dictionaries)
Data quality checks for validity, consistency and uniformity
All tag keys
Checks:
Corrections:
<node> key name Specific keys
k="addr:housenumber"
k="amenity"
k="name"
k="cuisine"
k="shop"
k="building"
k="inscription"
Example:
<node> or <way>
<tag k="amenity" v="restaurant"/>
</node> or </way>
Checks:
Corrections:
Methodology: Regular expression, Python code (dictionaries)
Data quality checks for validity
Checks:
Data quality checks for validity
Checks:
Corrupt data is removed from the dataset.
Data checks include:
The automated process is demonstrated in the routine fix_it_demo.py
# Filename: fix_it_demo.py
# Python 3.7
# Purpose: Correct and eliminate problematic data DEMO
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
from email.utils import parseaddr
from urllib.parse import urlparse
import pprint
import operator
pp = pprint.PrettyPrinter(indent=4, width=20)
map_file = "UpperWestSideFull.osm" # File size is 78.2MB
#================================#
# Construct dictionaries #
#================================#
streets_issue = defaultdict(int)
us_states_issue = defaultdict(int)
us_states_problem = defaultdict(int)
cities_issue = defaultdict(int)
cities_problem = defaultdict(int)
phones_issue = defaultdict(int)
emails_issue = defaultdict(int)
websites_issue = defaultdict(int)
zipcodes_issue = defaultdict(int)
tiger_issue = defaultdict(int)
streets_fix = defaultdict(int)
streets_dict = defaultdict(dict)
cities_fix = defaultdict(int)
cities_dict = defaultdict(dict)
us_states_fix = defaultdict(int)
us_states_dict = defaultdict(dict)
zipcodes_fix = defaultdict(int)
zipcodes_dict = defaultdict(dict)
phones_fix = defaultdict(int)
phones_dict = defaultdict(dict)
tiger_fix = defaultdict(int)
tiger_dict = defaultdict(dict)
house_fix = defaultdict(int)
house_dict = defaultdict(dict)
cuisine_fix = defaultdict(int)
cuisine_dict = defaultdict(dict)
counts = defaultdict(int)
value_issue = defaultdict(list)
node_bad_id = defaultdict(int)
way_bad_id = defaultdict(int)
node_bad_keys = defaultdict(int)
way_bad_keys = defaultdict(int)
way_node_reference_bad = defaultdict(int)
#===========================================#
# Initialize lists and dictionaries #
#===========================================#
ok_streets = [ 'Americas', 'Avenue', 'Boulevard', 'Broadway', 'Circle', 'Court', 'Drive', 'East', 'Lane',
'North', 'Parkway', 'Place', 'Plaza', 'Road', 'South', 'Square', 'Street', 'Terrace', 'Walk',
'Way', 'West']
other_city = ['Union City', 'West New York', 'North Bergen', 'Weehawken', 'Long Island City', 'Roosevelt Island',
'Queens', 'Guttenberg', 'Astoria', 'Hoboken', 'Jersey City', 'Morristown']
ok_domains = ['.com', '.org', '.net', '.edu', '.gov', '.us', '.nyc', '.biz', '.info', '.io', '.it',
'.co', '.site', '.cz', '.hu', '.int']
uws_zips = ['10023', '10024', '10025']
abbreviations = ['st', 'nd', 'rd', 'th']
phone_prefixes = ['212 ', '(212', '646 ', '(646', '212-', '646-', '917 ', '(917', '917-', '800 ', '800-',
'(800', '718-', '(888', '845-', '855-']
direction_mapping = { "N.": "North",
"E.": "East",
"S.": "South",
"W.": "West",
"N ": "North ",
"E ": "East ",
"S ": "South ",
"W ": "West "}
street_mapping = { "St": "Street", # Street mapping is specific to this dataset
"St.": "Street",
"street": "Street",
"st": "Street",
"st.": "Street",
"pl": "Place",
"pl.": "Place",
"place": "Place",
"Pl": "Place",
"Pl.": "Place",
"avenue": "Avenue",
"ave": "Avenue",
"ave.": "Avenue",
"Ave": "Avenue",
"Ave.": "Avenue",
"Avene": "Avenue",
"Aveneu": "Avenue",
"Avenue,#392": "Avenue",
"dr": "Drive",
"dr.": "Drive",
"Dr": "Drive",
"Dr.": "Drive",
"N": "North",
"S": "South",
"E": "East",
"W": "West"} # e.g. Central Park West
typo_mapping = {"nwe": "new",
"yoro": "york",
"ykrk": "york",
"ykro": "york"}
#====================================#
# Define regular expressions #
#====================================#
phone_re = re.compile(r"^(\+1\s?-?\(?\)?)(\d{3})\D*(\d{3})\D*(\d{4})$") # r: RE must process backslash as escape
email_re = re.compile(r"[\w.-]+@[\w.-]+")
website_re = re.compile(r"^(http\:\/\/)?(https\:\/\/)?([\w.-])*[\w-]+\.([a-zA-Z][a-zA-Z][a-zA-Z]?[a-zA-Z]?)(\/[\w/.#?=%&,!+()-]*)?$")
basic_re = re.compile(r"^[a-zA-Z0-9'_.,;:=–’>!´é~êçóÃáô®@½·\"\-\(\)\&\/\+\s]+$") # Note: Characters are specific to this dataset
# ================================================== #
# Helper Function #
# ================================================== #
def initialize_demo():
"""Clears the dictionaries and returns a boolean."""
try:
counts.clear()
streets_issue.clear()
us_states_issue.clear()
us_states_problem.clear()
cities_issue.clear()
cities_problem.clear()
phones_issue.clear()
emails_issue.clear()
websites_issue.clear()
zipcodes_issue.clear()
tiger_issue.clear()
value_issue.clear()
streets_fix.clear()
streets_dict.clear()
cities_fix.clear()
cities_dict.clear()
us_states_fix.clear()
us_states_dict.clear()
zipcodes_fix.clear()
zipcodes_dict.clear()
phones_fix.clear()
phones_dict.clear()
tiger_fix.clear()
tiger_dict.clear()
house_fix.clear()
house_dict.clear()
cuisine_fix.clear()
cuisine_dict.clear()
node_bad_id.clear()
way_bad_id.clear()
node_bad_keys.clear()
way_bad_keys.clear()
way_node_reference_bad.clear()
except:
return None
return True
# =================================================================== #
# Functions to correct the values #
# and related helper functions #
# =================================================================== #
def update_street_city(street_city, mapping):
"""Lookup function returns the mapping or None.
Arguments:
street_city -- the item to be mapped
mapping -- the mapping lookup dictionary to use
"""
try:
street_city = mapping[street_city]
except:
print ('Street or City mapping exception!')
return None
return street_city
def street_problem(name, street, node_or_way):
"""Function fix_streets helper eliminates street and returns None.
Arguments:
name -- name of street passed from function fix_streets
street -- street kind passed from function fix_streets
node_or_way -- node_or_way element tag passed from function fix_streets
"""
print (node_or_way, ': Street problem removed from dataset -- name: ', name, ' street: ', street)
streets_issue[street] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
def fix_streets(name, node_or_way):
"""Correct street dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:street key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace(): # Check for null characters: None, False, '', 0, and ' '
print (node_or_way, ': Street problem removed from dataset -- street is null or whitespace ', name)
streets_issue[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip()
flag = True
try:
name = name.rsplit(' ', 1) # name[0] + name[1] where name[1] is the street
except:
flag = False
if len(name) == 1: # name is a single word e.g. Broadway
street = name[0]
name = name[0]
flag = False
else:
street = name[1]
name = name[0]
if street in street_mapping.keys():
# Standardize street abbreviations
better_street = update_street_city(street, street_mapping)
if better_street:
streets_fix[street] += 1
streets_dict[street][better_street] = streets_fix[street]
# print ('Street fixed: ', street, "=>", better_street)
street = better_street
else:
return street_problem(name, street, node_or_way)
for k in direction_mapping.keys():
if k in name:
# Remove periods e.g. W. 86th => West 86th
# Convert abbreviations e.g. W 79th => West 79th
better_name = name.replace(k, update_street_city(k, direction_mapping))
streets_fix[name] += 1
streets_dict[name][better_name] = streets_fix[name]
# print ('Street name fixed: ', name, "=> name: ", better_name, " street: ", street)
name = better_name
if street.isalnum(): # Alphanumeric characters only
if street not in ok_streets:
if street[-2:] in abbreviations: # Examine the last 2 characters e.g. 86th
print ('Street with issue allowed ... name: ', name, ' street: ', street)
else:
return street_problem(name, street, node_or_way)
else:
return street_problem(name, street, node_or_way)
if flag:
name = name + ' ' + street
return name
def fix_city(name, node_or_way):
"""Correct city dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:city key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': City is null -- removed from dataset ', name)
cities_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip()
# Only alphabetical letters, spaces or ","
if not all(char.isalpha() or char.isspace() or ',' or '.' for char in name):
print (node_or_way, ': City contains problem characters -- removed from dataset ', name)
cities_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
namelow = name.lower()
for k in typo_mapping.keys(): # Fix spelling
if k in namelow:
namelow = namelow.replace(k, update_street_city(k, typo_mapping))
better_name = namelow.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City spelling fixed: ', name, "=>", better_name)
name = better_name
ok_city = ['New York', 'New York City']
ok_city_lower = ['new york', 'new york city']
if namelow in ok_city_lower and name not in ok_city: # Fix titlecase
better_name = name.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City title case fixed: ', name, "=>", better_name)
name = better_name
# Fix abbreviations or state e.g. 'New York NY' or 'New York, NY'
if any(word in namelow for word in ['ny', 'nyc', 'nyy']):
better_name = 'New York'
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City abbreviation fixed: ', name, "=>", better_name)
name = better_name
if namelow == 'west new york' and name != 'West New York': # Fix titlecase
better_name = name.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City West New York title case fixed: ', name, "=>", better_name)
name = better_name
# Change New York City to New York and fix punctuation e.g. 'New York,' in city name
elif name != 'West New York' and 'new york' in namelow and len(name) > 8:
better_name = name[:8]
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City extra end characters fixed: ', name, "=>", better_name)
name = better_name
if name not in ok_city: # Identified a problem
cities_issue[name] += 1 # Record the problem
if name not in other_city: # Identified a problem; allow certain cities in NJ
print (node_or_way, ': Problem city -- removed from dataset ', name)
cities_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
return name
def fix_state(name, node_or_way):
"""Correct state dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:state key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': State is null -- removed from dataset ', name)
us_states_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip().upper()
namelow = name.lower()
if '.' in name or ',' in name:
better_name = name.replace('.', '').replace(',', '') # Remove periods or commas e.g. 'N.Y.' => 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State punctuation fixed: ', name, "=>", better_name)
name = better_name
if namelow in ['new york', 'new york city']: # Fix state to NY
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State fixed: ', name, "=>", better_name)
name = better_name
if namelow == 'ny' and name != 'NY': # Fix to uppercase
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State case fixed: ', name, "=>", better_name)
name = better_name
if 'ny' in namelow and len(name) > 2: # Fixes NYC, NYY, 'NY NY' and similar NY issues
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State fixed: ', name, "=>", better_name)
name = better_name
if not name.isalpha(): # string contains only alphabetical characters and no spaces
print (node_or_way, ': State contains problem characters -- removed from dataset ', name)
us_states_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None
if name != 'NY': # Identified a problem
us_states_issue[name] += 1 # Record the problem
if name != 'NJ': # Identified a problem; allow 'NJ' state data
print (node_or_way, ': Problem state removed from dataset ', name)
us_states_problem[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None # Eliminate problematic data from dataset
return name
def zip_test(zip, node_or_way):
"""Function fix_zipcodes helper checks zip code and returns it if valid or returns None to eliminate.
Arguments:
zip -- the zip code to be examined, passed from function fix_zipcodes
node_or_way -- node_or_way element tag passed from function fix_zipcodes
"""
if not zip or zip.isspace():
print (node_or_way, ': Zipcode is null -- removed from dataset ', zip)
zipcodes_issue[zip] += 1
counts[node_or_way + ' eliminated'] += 1
return None
if zip.isdigit() and len(zip) == 5: # zipcode contains only 5 digits
return zip
else:
print (node_or_way, ': Zipcode is not valid -- removed from dataset ', zip)
zipcodes_issue[zip] += 1
counts[node_or_way + ' eliminated'] += 1
return None
def fix_zipcodes(name, node_or_way):
"""Correct zip code dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:postcode key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
name = name.strip()
if '-' in name and len(name) == 10: # Zip+4 strip off the plus 4
better_name = name[:5]
zipcodes_fix[name] += 1
zipcodes_dict[name][better_name] = zipcodes_fix[name]
# print ('Zip+4 fixed: ', name, "=>", better_name)
name = better_name
if 'NY' in name: # Strip out NY
better_name = name[-5:] # From end of string
zipcodes_fix[name] += 1
zipcodes_dict[name][better_name] = zipcodes_fix[name]
# print ('Zip code fixed: ', name, "=>", better_name)
name = better_name
return zip_test(name, node_or_way)
def fix_phone(name, node_or_way):
"""Correct phone number dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the phone key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Phone is null -- removed from dataset ', name)
phones_issue[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None
name = name.strip()
if name[0] == '+' and name[1] != '1':
better_name = name.replace('+', '+1 ') # Fix '+' without '1'
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '.' in name:
better_name = name.replace('.', '-') # Replace any periods in phone number with a dash
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '001' in name[:3]: # Begining of string
better_name = name.replace('001', '+1')
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '1 ' in name[:2] or '1-' in name[:2]:
better_name = '+1 ' + name[2:]
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if any(prefix in name[:4] for prefix in phone_prefixes):
better_name = '+1 ' + name
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if ('212' in name[:3] or '646' in name[:3]) and name[3].isdigit():
better_name = '+1 ' + name
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if ' ' in name[-4:]:
end = len(name) - 5
better_name = name[:end] + name[-5:].replace(' ', '')
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
match = phone_re.search(name)
if not match:
phones_issue[name] += 1
print (node_or_way, ': Phone problem -- removed from dataset ', name)
counts[node_or_way + ' eliminated'] += 1
return None
return name
def fix_email(name, node_or_way):
"""Checks email address and returns it if valid or returns None if data is eliminated.
Arguments:
name -- the value of the email key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Email is null -- removed from dataset ', name)
emails_issue[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None
name = name.strip().lower()
first_parse = parseaddr(name) # parseaddr returns an email tuple ('username', 'address')
second_parse = not email_re.search(name) # search() returns None (False) if no match can be found
# If match found, a match object instance is returned
third_parse = not first_parse[1].endswith(tuple(ok_domains))
if first_parse == ('', '') or second_parse or third_parse:
emails_issue[name] += 1
print (node_or_way, ': Email problem -- removed from dataset ', name)
counts[node_or_way + ' eliminated'] += 1
return None
return name
def fix_website(name, node_or_way):
"""Checks website URL and returns it if valid or returns None if data is eliminated.
Arguments:
name -- the value of the website or url key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Website is null -- removed from dataset ', name)
websites_issue[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None
name = name.strip().lower()
flag_1 = True
first_parse = urlparse(name) # urlparse returns a 6-tuple
if (first_parse.scheme == '') and (first_parse.netloc == '') and (first_parse.path == ''):
flag_1 = False
match = website_re.search(name)
flag_2 = False
for domain in ok_domains:
if domain in name:
flag_2 = True
break
else:
flag_2 = False
if not (match and flag_1 and flag_2):
websites_issue[name] += 1
print (node_or_way, ': Website problem -- removed from dataset ', name)
counts[node_or_way + ' eliminated'] += 1
return None
return name
def fix_tiger_no(name, node_or_way):
"""Correct TIGER dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the tiger:reviewed key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Tiger reviewed is null -- removed from dataset ', name)
tiger_issue[name] += 1
counts[node_or_way + ' eliminated'] += 1
return None
name = name.strip()
if name in ['; no; no', 'not']:
better_name = 'no'
tiger_fix[name] += 1
tiger_dict[name][better_name] = tiger_fix[name]
# print ('Tiger fixed: ', name, "=>", better_name)
return better_name
stripes = ['yes', 'no', 'aerial']
if name not in stripes:
tiger_issue[name] += 1
print (node_or_way, ': TIGER reviewed problem -- removed from dataset ', name)
counts[node_or_way + ' eliminated'] += 1
return None
return name
def basic_fix(key, value, node_or_way):
"""Correct value dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
key -- the child element tag key
value -- the child element tag value
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not value or value.isspace(): # None, False, '', 0, and ' '
print (node_or_way, ': ', key, ' problem removed from dataset -- value is null or whitespace ', value)
value_issue[key].append(value)
counts[node_or_way + ' eliminated'] += 1
return None
value = value.strip()
if key == "addr:housenumber" and '#' in value:
better_value = value.replace('#', '')
# print (key, ' value ', value, ' changed to ', better_value)
house_fix[value] += 1
house_dict[value][better_value] = house_fix[value]
value = better_value
if key == "cuisine":
if (not value.istitle()) or ('_' in value):
better_value = value.title().replace('_', ' ') # consistent case for values, change '_'
# print (key, ' value ', value, ' changed to ', better_value)
cuisine_fix[value] += 1
cuisine_dict[value][better_value] = cuisine_fix[value]
value = better_value
match = basic_re.findall(value)
if not match:
print (node_or_way, ': ', key, ' problem removed from dataset -- value is not allowed ', value)
value_issue[key].append(value)
counts[node_or_way + ' eliminated'] += 1
return None
return value
# ======================================================= #
# Identity Functions #
# ======================================================= #
def is_housenumber(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:housenumber")
def is_amenity(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "amenity")
def is_name(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "name")
def is_cuisine(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "cuisine")
def is_shop(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "shop")
def is_building(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "building")
def is_street(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")
def is_state(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:state")
def is_city(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:city")
def is_zipcode(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:postcode")
def is_phone(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "phone")
def is_email(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "email")
def is_website(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and ( (elem.attrib["k"] == "website") or (elem.attrib["k"] == "url") )
def is_tiger(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "tiger:reviewed")
def is_inscription(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and ("inscription" in elem.attrib['k'])
# ================================================== #
# Main Function #
# ================================================== #
def element_tree(osm_file):
"""Parse XML file data and yield an element tree.
Iteratively step through each top level XML element and read in sections of the XML file
as a tree of the element
Arguments:
osm_file -- the Open Street Map XML file to process
"""
context = ET.iterparse(osm_file, events=('start', 'end'))
_, root = next(context) # root saves a reference to the iterator (block of XML) currently in process
for event, element in context: # the result is an iterable that returns a stream of (event, element) tuples
if event == 'end': # end returns the fully populated element (including children)
yield element # yield returns a generator
root.clear()
del context
return
def fix_it_demo():
"""Steps through each element in a tree, sends the value data out for correction or elimination, and prints a report."""
tags=('node', 'way', 'tag', 'nd')
correct_chars_re = re.compile(r"^[a-zA-Z:\-_1-9]+$")
response = initialize_demo()
if not response:
print ('Fatal Error initializing dictionaries')
print ('\nTerminating execution...')
return None
print("\nDATA ELIMINATIONS\n")
for element in element_tree(map_file):
counts['element count'] += 1
if element == None:
print ('Element is Null')
continue
if element.tag in ['node', 'way'] and not element.attrib['id'].isdigit():
print (element.tag.capitalize(), 'ID is Null or not a number: ', element.attrib['id'])
if element.tag == 'node':
node_bad_id[element.attrib['id']] += 1
else:
way_bad_id[element.attrib['id']] += 1
continue
if element.tag in ['node', 'way']:
if element.tag == 'node':
counts['node count'] += 1
if element.tag == 'way':
counts['way count'] += 1
node_or_way = element.tag.capitalize()
for child in element:
skip = '?'
if child.tag == 'nd':
if child.attrib['ref'].isdigit():
counts['way node count'] += 1
else:
print ('Way Node reference is Null or not a number: ', child.attrib['ref'])
way_node_reference_bad[child.attrib['ref']] += 1
continue
if child.tag != 'tag':
continue
if 'cityracks.' in child.attrib['k']:
child.attrib['k'] = child.attrib['k'].replace('cityracks.','')
m = correct_chars_re.search(child.attrib['k']) # No match returns None
if not m:
print (node_or_way, 'key -- Problem character! ', 'key = ', child.attrib['k'], ' value = ', child.attrib['v'])
if node_or_way == 'Node':
node_bad_keys[child.attrib['k']] += 1
elif node_or_way == 'Way':
way_bad_keys[child.attrib['k']] += 1
continue # eliminate the problematic child tag
if is_housenumber(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_amenity(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_name(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_cuisine(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_shop(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_building(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if is_street(child):
skip = fix_streets(child.attrib['v'], node_or_way)
if is_state(child):
skip = fix_state(child.attrib["v"], node_or_way)
if is_city(child):
skip = fix_city(child.attrib["v"], node_or_way)
if is_zipcode(child):
skip = fix_zipcodes(child.attrib["v"], node_or_way)
if is_phone(child):
skip = fix_phone(child.attrib["v"], node_or_way)
if is_email(child):
skip = fix_email(child.attrib["v"], node_or_way)
if is_website(child):
skip = fix_website(child.attrib["v"], node_or_way)
if is_tiger(child):
skip = fix_tiger_no(child.attrib["v"], node_or_way)
if is_inscription(child):
skip = basic_fix(child.attrib['k'], child.attrib['v'], node_or_way)
if skip not in [None, '?']:
counts['tags processed'] += 1
if element.tag == 'node':
counts['node tag count'] += 1
if element.tag == 'way':
counts['way tag count'] += 1
if skip == '?':
counts['tags skipped'] += 1
if element.tag == 'node':
counts['node tags skipped'] += 1
if element.tag == 'way':
counts['way tags skipped'] += 1
print_summary_report()
print_detailed_report()
return
# ============================================================================= #
# Functions to print a detailed report of data corrected #
# ============================================================================= #
def print_dict(dic, text):
"""Helper function to print a dictionary formatted and returns None.
Arguments:
dic -- the dictionary to be formatted
text -- wording related to the dictionary
"""
print ('\n' + text + ' fixes:')
total = 0
for key, val in dic.items(): # Python 3 returns a view, NOT a list
value_list = list(val.items())
print (' ', key, ' => ', value_list[0][0], ' :', value_list[0][1])
total += value_list[0][1]
if text == 'City':
text = 'Citie'
if text == 'Tiger no':
text = "Tiger no'"
print (text + 's' + ' fixed:', total)
return
def print_detailed_report():
"""Prints a detailed report of data corrections and eliminations, and returns None."""
streets_issue_sort = sorted(streets_issue.items(), key=operator.itemgetter(1))
streets_issue_sort.reverse() # Note: In-place reversal
cities_issue_sort = sorted(cities_issue.items(), key=operator.itemgetter(1))
cities_issue_sort.reverse() # Note: In-place reversal
cities_problem_sort = sorted(cities_problem.items(), key=operator.itemgetter(1))
cities_problem_sort.reverse() # Note: In-place reversal
us_states_issue_sort = sorted(us_states_issue.items(), key=operator.itemgetter(1))
us_states_issue_sort.reverse() # Note: In-place reversal
us_states_problem_sort = sorted(us_states_problem.items(), key=operator.itemgetter(1))
us_states_problem_sort.reverse() # Note: In-place reversal
zipcodes_issue_sort = sorted(zipcodes_issue.items(), key=operator.itemgetter(1))
zipcodes_issue_sort.reverse() # Note: In-place reversal
phones_issue_sort = sorted(phones_issue.items(), key=operator.itemgetter(1))
phones_issue_sort.reverse() # Note: In-place reversal
emails_issue_sort = sorted(emails_issue.items(), key=operator.itemgetter(1))
emails_issue_sort.reverse() # Note: In-place reversal
websites_issue_sort = sorted(websites_issue.items(), key=operator.itemgetter(1))
websites_issue_sort.reverse() # Note: In-place reversal
print("\n---------------------------------------------------------")
print("CONSOLIDATED DETAILS OF DATA CORRECTIONS AND ELIMINATIONS")
print_dict(streets_dict, 'Street') # Street fixes
sum_val = sum(streets_issue.values())
print ('\nNumber of street issues: {:,}'.format(sum_val), 'streets removed from dataset')
print ("Street problems: ")
print ( *streets_issue_sort, sep = "\n" )
print_dict(cities_dict, 'City') # City fixes
sum_val = sum(cities_issue.values())
print ('\nNumber of cities not in NYC: {:,}'.format(sum_val))
print ("Cities outside NYC: ")
print ( *cities_issue_sort, sep = "\n" )
sum_val = sum(cities_problem.values())
print ('\nNumber of city problems: {:,}'.format(sum_val), 'cities removed from dataset')
print ("City problems: ")
print ( *cities_problem_sort, sep = "\n" )
print_dict(us_states_dict, 'State') # State fixes
sum_val = sum(us_states_issue.values())
print ('\nNumber of state issues: {:,}'.format(sum_val))
print ("States outside NY: ")
print ( *us_states_issue_sort, sep = "\n" )
sum_val = sum(us_states_problem.values())
print ('\nNumber of state problems: {:,}'.format(sum_val), 'states removed from dataset')
print ("State problems: ")
print ( *us_states_problem_sort, sep = "\n" )
print_dict(zipcodes_dict, 'Zip code') # Zip code fixes
sum_val = sum(zipcodes_issue.values())
print ('\nNumber of zipcode issues: {:,}'.format(sum_val), 'zipcodes removed from dataset')
print ("Zipcode problems: ")
print ( *zipcodes_issue_sort, sep = "\n" )
print_dict(phones_dict, 'Phone') # Phone fixes
sum_val = sum(phones_issue.values())
print ('\nNumber of phone number issues: {:,}'.format(sum_val), 'phone numbers removed from dataset')
print ("Phone problems: ")
print ( *phones_issue_sort, sep = "\n" )
sum_val = sum(emails_issue.values())
print ('\nNumber of email address issues: {:,}'.format(sum_val), 'emails removed from dataset')
print ("email problems: ")
print ( *emails_issue_sort, sep = "\n" )
sum_val = sum(websites_issue.values())
print ('\nNumber of website URL issues: {:,}'.format(sum_val), 'websites removed from dataset')
print ("Website problems: ")
print ( *websites_issue_sort, sep = "\n" )
print_dict(tiger_dict, 'Tiger no') # TIGER fixes
sum_val = sum(tiger_issue.values())
print ('\nNumber of TIGER issues: {:,}'.format(sum_val), 'TIGER tags removed from dataset')
print ('(TIGER is not [\'yes\', \'no\', \'aerial\'])')
print ("TIGER problems: ")
pp.pprint ( dict(tiger_issue) )
print_dict(house_dict, 'House number') # House number fixes
print_dict(cuisine_dict, 'Cuisine name') # Cuisine fixes
total = 0
for value in value_issue.values():
total += len(value)
print ('\nNumber of other value issues:', total, 'tags removed from dataset')
print ("Other value problems: ")
pp.pprint ( dict(value_issue) )
print ("\nCorrupt ID's:")
print ("Nodes bad ID's:", sum(node_bad_id.values()), '<nodes> removed from dataset')
pp.pprint ( dict(node_bad_id) )
print ("Ways bad ID's:", sum(way_bad_id.values()), '<ways> removed from dataset')
pp.pprint ( dict(way_bad_id) )
print ('\nCorrupt keys:')
print ('Nodes bad keys:', sum(node_bad_keys.values()), 'tags removed from dataset')
pp.pprint ( dict(node_bad_keys) )
print ('Ways bad keys:', sum(way_bad_keys.values()), 'tags removed from dataset')
pp.pprint ( dict(way_bad_keys) )
print ('\nWays nodes corrupt references: ', sum(way_node_reference_bad.values()), '<ways nodes> removed from dataset')
pp.pprint ( dict(way_node_reference_bad) )
return
# ================================================================================= #
# Functions to print reports of the process #
# ================================================================================= #
def print_summary_report():
"""Prints a summary report of data corrections and eliminations, and returns None."""
print ('\n-------')
print ('SUMMARY')
print ('\nTag counts:')
print (" Nodes: {:,}".format(counts['node count']) )
print (" Ways: {:,}".format(counts['way count']) )
print (" Nodes tags: {:,}".format(counts['node tag count']) )
print (" Ways tags: {:,}".format(counts['way tag count']) )
print (" Ways Nodes: {:,}".format(counts['way node count']) )
print ('\nEliminated tag counts:')
print (' Tags with bad data values')
print (' Nodes tags voided: {:,}'.format(counts['Node eliminated']))
print (' Ways tags voided: {:,}'.format(counts['Way eliminated']))
print ('\n Tags with corrupt ID')
print (' Nodes removed: {:,}'.format(sum(node_bad_id.values())) )
print (' Ways removed: {:,}'.format(sum(way_bad_id.values())) )
print ('\n Tags with corrupt keys')
print (' Nodes tags with key problem: {:,}'.format(sum(node_bad_keys.values())) )
print (' Ways tags with key problem: {:,}'.format(sum(way_bad_keys.values())) )
print ('\n Tags with defective reference')
print (' Ways Nodes tags voided: {:,}'.format(sum(way_node_reference_bad.values())) )
print ('\nTotal tags skipped: {:,}'.format(counts['tags skipped']))
print ( "\nTotal elements processed: {:,}".format(counts['element count']) )
return
#========================#
# Runner #
#========================#
if __name__ == '__main__':
fix_it_demo()
The summary report for the Test dataset is provided below for readers wishing to run the enclosed Python files.

The programmatic routines below are used to perform the data wrangling:
-- fix_it.py
-- element_to_dictionary.py
-- main_process.py
The output are the validated (against the schema) CSV files containing the parsed dataset.
The modules fix_it.py and element_to_dictionary.py cannot be executed independently. They are included in main_process.py.
To create the CSV files, run main_process.py.
# Filename: fix_it.py
# Python 3.7
# Notes:
# This is a module of main_process.py
# Not to be run independently -- Use 'python main_process.py'
# To view a demo of the fix_it routine -- Run 'python fix_it_demo.py'
# Purpose: Correct problematic data
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
from email.utils import parseaddr
from urllib.parse import urlparse
import pprint
import operator
pp = pprint.PrettyPrinter(indent=4, width=20)
#================================#
# Construct dictionaries #
#================================#
streets_issue = defaultdict(int)
us_states_issue = defaultdict(int)
us_states_problem = defaultdict(int)
cities_issue = defaultdict(int)
cities_problem = defaultdict(int)
phones_issue = defaultdict(int)
emails_issue = defaultdict(int)
websites_issue = defaultdict(int)
zipcodes_issue = defaultdict(int)
tiger_issue = defaultdict(int)
streets_fix = defaultdict(int)
streets_dict = defaultdict(dict)
cities_fix = defaultdict(int)
cities_dict = defaultdict(dict)
us_states_fix = defaultdict(int)
us_states_dict = defaultdict(dict)
zipcodes_fix = defaultdict(int)
zipcodes_dict = defaultdict(dict)
phones_fix = defaultdict(int)
phones_dict = defaultdict(dict)
tiger_fix = defaultdict(int)
tiger_dict = defaultdict(dict)
house_fix = defaultdict(int)
house_dict = defaultdict(dict)
cuisine_fix = defaultdict(int)
cuisine_dict = defaultdict(dict)
counts = defaultdict(int)
value_issue = defaultdict(list)
bad_keys = defaultdict(int)
way_id_bad = defaultdict(int)
node_id_bad = defaultdict(int)
way_node_reference_bad = defaultdict(int)
#===========================================#
# Initialize lists and dictionaries #
#===========================================#
ok_streets = [ 'Americas', 'Avenue', 'Boulevard', 'Broadway', 'Circle', 'Court', 'Drive', 'East', 'Lane',
'North', 'Parkway', 'Place', 'Plaza', 'Road', 'South', 'Square', 'Street', 'Terrace', 'Walk',
'Way', 'West']
other_city = ['Union City', 'West New York', 'North Bergen', 'Weehawken', 'Long Island City', 'Roosevelt Island',
'Queens', 'Guttenberg', 'Astoria', 'Hoboken', 'Jersey City', 'Morristown']
ok_domains = ['.com', '.org', '.net', '.edu', '.gov', '.us', '.nyc', '.biz', '.info', '.io', '.it',
'.co', '.site', '.cz', '.hu', '.int']
uws_zips = ['10023', '10024', '10025']
abbreviations = ['st', 'nd', 'rd', 'th']
phone_prefixes = ['212 ', '(212', '646 ', '(646', '212-', '646-', '917 ', '(917', '917-', '800 ', '800-',
'(800', '718-', '(888', '845-', '855-']
direction_mapping = { "N.": "North",
"E.": "East",
"S.": "South",
"W.": "West",
"N ": "North ",
"E ": "East ",
"S ": "South ",
"W ": "West "}
street_mapping = { "St": "Street", # Street mapping is specific to this dataset
"St.": "Street",
"street": "Street",
"st": "Street",
"st.": "Street",
"pl": "Place",
"pl.": "Place",
"place": "Place",
"Pl": "Place",
"Pl.": "Place",
"avenue": "Avenue",
"ave": "Avenue",
"ave.": "Avenue",
"Ave": "Avenue",
"Ave.": "Avenue",
"Avene": "Avenue",
"Aveneu": "Avenue",
"Avenue,#392": "Avenue",
"dr": "Drive",
"dr.": "Drive",
"Dr": "Drive",
"Dr.": "Drive",
"N": "North",
"S": "South",
"E": "East",
"W": "West"} # e.g. Central Park West
typo_mapping = {"nwe": "new",
"yoro": "york",
"ykrk": "york",
"ykro": "york"}
#====================================#
# Define regular expressions #
#====================================#
phone_re = re.compile(r"^(\+1\s?-?\(?\)?)(\d{3})\D*(\d{3})\D*(\d{4})$") # r: RE must process backslash as escape
email_re = re.compile(r"[\w.-]+@[\w.-]+")
website_re = re.compile(r"^(http\:\/\/)?(https\:\/\/)?([\w.-])*[\w-]+\.([a-zA-Z][a-zA-Z][a-zA-Z]?[a-zA-Z]?)(\/[\w/.#?=%&,!+()-]*)?$")
basic_re = re.compile(r"^[a-zA-Z0-9'_.,;:=–’>!´é~êçóÃáô®@½·\"\-\(\)\&\/\+\s]+$") # Note: Characters are specific to this dataset
# ================================================== #
# Helper Function #
# ================================================== #
# Clears the global dictionaries
def initialize():
"""Clears the dictionaries and returns a boolean."""
try:
counts.clear()
streets_issue.clear()
us_states_issue.clear()
us_states_problem.clear()
cities_issue.clear()
cities_problem.clear()
phones_issue.clear()
emails_issue.clear()
websites_issue.clear()
zipcodes_issue.clear()
tiger_issue.clear()
value_issue.clear()
streets_fix.clear()
streets_dict.clear()
cities_fix.clear()
cities_dict.clear()
us_states_fix.clear()
us_states_dict.clear()
zipcodes_fix.clear()
zipcodes_dict.clear()
phones_fix.clear()
phones_dict.clear()
tiger_fix.clear()
tiger_dict.clear()
house_fix.clear()
house_dict.clear()
cuisine_fix.clear()
cuisine_dict.clear()
bad_keys.clear()
way_id_bad.clear()
node_id_bad.clear()
way_node_reference_bad.clear()
except:
return None
return True
# =================================================================== #
# Functions to correct the values #
# and related helper functions #
# =================================================================== #
def update_street_city(street_city, mapping):
"""Lookup function returns the mapping or None.
Arguments:
street_city -- the item to be mapped
mapping -- the mapping lookup dictionary to use
"""
try:
street_city = mapping[street_city]
except:
print ('Street or City mapping exception!')
return None
return street_city
def street_problem(name, street, node_or_way):
"""Function fix_streets helper eliminates street and returns None.
Arguments:
name -- name of street passed from function fix_streets
street -- street kind passed from function fix_streets
node_or_way -- node_or_way element tag passed from function fix_streets
"""
print (node_or_way, ': Street problem removed from dataset -- name: ', name, ' street: ', street)
streets_issue[street] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
def fix_streets(name, node_or_way):
"""Correct street dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:street key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace(): # Check for null characters: None, False, '', 0, and ' '
print (node_or_way, ': Street problem removed from dataset -- street is null or whitespace ', name)
streets_issue[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip()
flag = True
try:
name = name.rsplit(' ', 1) # name[0] + name[1] where name[1] is the street
except:
flag = False
if len(name) == 1: # name is a single word e.g. Broadway
street = name[0]
name = name[0]
flag = False
else:
street = name[1]
name = name[0]
if street in street_mapping.keys():
# Standardize street abbreviations
better_street = update_street_city(street, street_mapping)
if better_street:
streets_fix[street] += 1
streets_dict[street][better_street] = streets_fix[street]
# print ('Street fixed: ', street, "=>", better_street)
street = better_street
else:
return street_problem(name, street, node_or_way)
for k in direction_mapping.keys():
if k in name:
# Remove periods e.g. W. 86th => West 86th
# Convert abbreviations e.g. W 79th => West 79th
better_name = name.replace(k, update_street_city(k, direction_mapping))
streets_fix[name] += 1
streets_dict[name][better_name] = streets_fix[name]
# print ('Street name fixed: ', name, "=> name: ", better_name, " street: ", street)
name = better_name
if street.isalnum(): # Alphanumeric characters only
if street not in ok_streets:
if street[-2:] in abbreviations: # Examine the last 2 characters e.g. 86th
print ('Street with issue allowed ... name: ', name, ' street: ', street)
else:
return street_problem(name, street, node_or_way)
else:
return street_problem(name, street, node_or_way)
if flag:
name = name + ' ' + street
return name
def fix_city(name, node_or_way):
"""Correct city dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:city key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': City is null -- removed from dataset ', name)
cities_problem[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip()
# Only alphabetical letters, spaces or ","
if not all(char.isalpha() or char.isspace() or ',' or '.' for char in name):
print (node_or_way, ': City contains problem characters -- removed from dataset ', name)
cities_problem[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
namelow = name.lower()
for k in typo_mapping.keys(): # Fix spelling
if k in namelow:
namelow = namelow.replace(k, update_street_city(k, typo_mapping))
better_name = namelow.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City spelling fixed: ', name, "=>", better_name)
name = better_name
ok_city = ['New York', 'New York City']
ok_city_lower = ['new york', 'new york city']
if namelow in ok_city_lower and name not in ok_city: # Fix titlecase
better_name = name.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City title case fixed: ', name, "=>", better_name)
name = better_name
# Fix abbreviations or state e.g. 'New York NY' or 'New York, NY'
if any(word in namelow for word in ['ny', 'nyc', 'nyy']):
better_name = 'New York'
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City abbreviation fixed: ', name, "=>", better_name)
name = better_name
if namelow == 'west new york' and name != 'West New York': # Fix titlecase
better_name = name.title()
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City West New York title case fixed: ', name, "=>", better_name)
name = better_name
# Change New York City to New York and fix punctuation e.g. 'New York,' in city name
elif name != 'West New York' and 'new york' in namelow and len(name) > 8:
better_name = name[:8]
cities_fix[name] += 1
cities_dict[name][better_name] = cities_fix[name]
# print ('City extra end characters fixed: ', name, "=>", better_name)
name = better_name
if name not in ok_city: # Identified a problem
cities_issue[name] += 1 # Record the problem
if name not in other_city: # Identified a problem; allow certain cities in NJ
print (node_or_way, ': Problem city -- removed from dataset ', name)
cities_problem[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
return name
def fix_state(name, node_or_way):
"""Correct state dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:state key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': State is null -- removed from dataset ', name)
us_states_problem[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
name = name.strip().upper()
namelow = name.lower()
if '.' in name or ',' in name:
better_name = name.replace('.', '').replace(',', '') # Remove periods or commas e.g. 'N.Y.' => 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State punctuation fixed: ', name, "=>", better_name)
name = better_name
if namelow in ['new york', 'new york city']: # Fix state to NY
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State fixed: ', name, "=>", better_name)
name = better_name
if namelow == 'ny' and name != 'NY': # Fix to uppercase
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State case fixed: ', name, "=>", better_name)
name = better_name
if 'ny' in namelow and len(name) > 2: # Fixes NYC, NYY, 'NY NY' and similar NY issues
better_name = 'NY'
us_states_fix[name] += 1
us_states_dict[name][better_name] = us_states_fix[name]
# print ('State fixed: ', name, "=>", better_name)
name = better_name
if not name.isalpha(): # string contains only alphabetical characters and no spaces
print (node_or_way, ': State contains problem characters -- removed from dataset ', name)
us_states_problem[name] += 1
counts['value eliminated'] += 1
return None
if name != 'NY': # Identified a problem
us_states_issue[name] += 1 # Record the problem
if name != 'NJ': # Identified a problem; allow 'NJ' state data
print (node_or_way, ': Problem state removed from dataset ', name)
us_states_problem[name] += 1
counts['value eliminated'] += 1
return None # Eliminate problematic data from dataset
return name
def zip_test(zip, node_or_way):
"""Function fix_zipcodes helper checks zip code and returns it if valid or returns None to eliminate.
Arguments:
zip -- the zip code to be examined, passed from function fix_zipcodes
node_or_way -- node_or_way element tag passed from function fix_zipcodes
"""
if not zip or zip.isspace():
print (node_or_way, ': Zipcode is null -- removed from dataset ', zip)
zipcodes_issue[zip] += 1
counts['value eliminated'] += 1
return None
if zip.isdigit() and len(zip) == 5: # zipcode contains only 5 digits
return zip
else:
print (node_or_way, ': Zipcode is not valid -- removed from dataset ', zip)
zipcodes_issue[zip] += 1
counts['value eliminated'] += 1
return None
def fix_zipcodes(name, node_or_way):
"""Correct zip code dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the addr:postcode key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
name = name.strip()
if '-' in name and len(name) == 10: # Zip+4 strip off the plus 4
better_name = name[:5]
zipcodes_fix[name] += 1
zipcodes_dict[name][better_name] = zipcodes_fix[name]
# print ('Zip+4 fixed: ', name, "=>", better_name)
name = better_name
if 'NY' in name: # Strip out NY
better_name = name[-5:] # From end of string
zipcodes_fix[name] += 1
zipcodes_dict[name][better_name] = zipcodes_fix[name]
# print ('Zip code fixed: ', name, "=>", better_name)
name = better_name
return zip_test(name, node_or_way)
def fix_phone(name, node_or_way):
"""Correct phone number dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the phone key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Phone is null -- removed from dataset ', name)
phones_issue[name] += 1
counts['value eliminated'] += 1
return None
name = name.strip()
if name[0] == '+' and name[1] != '1':
better_name = name.replace('+', '+1 ') # Fix '+' without '1'
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '.' in name:
better_name = name.replace('.', '-') # Replace any periods in phone number with a dash
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '001' in name[:3]: # Begining of string
better_name = name.replace('001', '+1')
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if '1 ' in name[:2] or '1-' in name[:2]:
better_name = '+1 ' + name[2:]
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if any(prefix in name[:4] for prefix in phone_prefixes):
better_name = '+1 ' + name
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if ('212' in name[:3] or '646' in name[:3]) and name[3].isdigit():
better_name = '+1 ' + name
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
if ' ' in name[-4:]:
end = len(name) - 5
better_name = name[:end] + name[-5:].replace(' ', '')
phones_fix[name] += 1
phones_dict[name][better_name] = phones_fix[name]
# print ('Phone fixed: ', name, "=>", better_name)
name = better_name
match = phone_re.search(name)
if not match:
phones_issue[name] += 1
print (node_or_way, ': Phone problem -- removed from dataset ', name)
counts['value eliminated'] += 1
return None
return name
def fix_email(name, node_or_way):
"""Checks email address and returns it if valid or returns None if data is eliminated.
Arguments:
name -- the value of the email key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Email is null -- removed from dataset ', name)
emails_issue[name] += 1
counts['value eliminated'] += 1
return None
name = name.strip().lower()
first_parse = parseaddr(name)
second_parse = not email_re.search(name) # search() returns None (False) if no match can be found
# If match found, a match object instance is returned
third_parse = not first_parse[1].endswith(tuple(ok_domains))
if first_parse == ('', '') or second_parse or third_parse:
emails_issue[name] += 1
print (node_or_way, ': Email problem -- removed from dataset ', name)
counts['value eliminated'] += 1
return None
return name
def fix_website(name, node_or_way):
"""Checks website URL and returns it if valid or returns None if data is eliminated.
Arguments:
name -- the value of the website or url key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Website is null -- removed from dataset ', name)
websites_issue[name] += 1
counts['value eliminated'] += 1
return None
name = name.strip().lower()
flag_1 = True
first_parse = urlparse(name)
if (first_parse.scheme == '') and (first_parse.netloc == '') and (first_parse.path == ''):
flag_1 = False
match = website_re.search(name)
flag_2 = False
for domain in ok_domains:
if domain in name:
flag_2 = True
break
else:
flag_2 = False
if not (match and flag_1 and flag_2):
websites_issue[name] += 1
print (node_or_way, ': Website problem -- removed from dataset ', name)
counts['value eliminated'] += 1
return None
return name
def fix_tiger_no(name, node_or_way):
"""Correct TIGER dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
name -- the value of the tiger:reviewed key
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not name or name.isspace():
print (node_or_way, ': Tiger reviewed is null -- removed from dataset ', name)
tiger_issue[name] += 1
counts['value eliminated'] += 1
return None
name = name.strip()
if name in ['; no; no', 'not']:
better_name = 'no'
tiger_fix[name] += 1
tiger_dict[name][better_name] = tiger_fix[name]
# print ('Tiger fixed: ', name, "=>", better_name)
return better_name
stripes = ['yes', 'no', 'aerial']
if name not in stripes:
tiger_issue[name] += 1
print (node_or_way, ': TIGER reviewed problem -- removed from dataset ', name)
counts['value eliminated'] += 1
return None
return name
def basic_fix(key, value, node_or_way):
"""Correct value dirty data if possible and return corrected value or None if data is eliminated.
Arguments:
key -- the child element tag key
value -- the child element tag value
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if not value or value.isspace(): # None, False, '', 0, and ' '
print (node_or_way, ': ', key, ' problem removed from dataset -- value is null or whitespace ', value)
value_issue[key].append(value)
counts['value eliminated'] += 1
return None
value = value.strip()
if key == "addr:housenumber" and '#' in value:
better_value = value.replace('#', '')
# print (key, ' value ', value, ' changed to ', better_value)
house_fix[value] += 1
house_dict[value][better_value] = house_fix[value]
value = better_value
if key == "cuisine":
if (not value.istitle()) or ('_' in value):
better_value = value.title().replace('_', ' ') # consistent case for values, change '_'
# print (key, ' value ', value, ' changed to ', better_value)
cuisine_fix[value] += 1
cuisine_dict[value][better_value] = cuisine_fix[value]
value = better_value
match = basic_re.findall(value)
if not match:
print (node_or_way, ': ', key, ' problem removed from dataset -- value is not allowed ', value)
value_issue[key].append(value)
counts['value eliminated'] += 1
return None
return value
# ======================================================= #
# Identity Functions #
# ======================================================= #
def is_housenumber(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:housenumber")
def is_amenity(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "amenity")
def is_name(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "name")
def is_cuisine(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "cuisine")
def is_shop(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "shop")
def is_building(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "building")
def is_street(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")
def is_state(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:state")
def is_city(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:city")
def is_zipcode(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "addr:postcode")
def is_phone(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "phone")
def is_email(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "email")
def is_website(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and ( (elem.attrib["k"] == "website") or (elem.attrib["k"] == "url") )
def is_tiger(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and (elem.attrib["k"] == "tiger:reviewed")
def is_inscription(elem):
"""Examine the key of the element tag and return a boolean.
Arguments:
elem -- the current XML element in the Element Tree iteration
"""
return (elem.tag == "tag") and ("inscription" in elem.attrib['k'])
# ================================================== #
# Main Function #
# ================================================== #
def fixer(element, node_or_way):
"""Sends the value data out for parsing and returns the corrected value, or None if eliminated,
or skips the element.
Arguments:
element -- the child element tag value
node_or_way -- Indicates XML element tag is a <node> or <way>
"""
if is_housenumber(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_amenity(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_name(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_cuisine(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_shop(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_building(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
if is_street(element):
return fix_streets(element.attrib['v'], node_or_way)
if is_state(element):
return fix_state(element.attrib["v"], node_or_way)
if is_city(element):
return fix_city(element.attrib["v"], node_or_way)
if is_zipcode(element):
return fix_zipcodes(element.attrib["v"], node_or_way)
if is_phone(element):
return fix_phone(element.attrib["v"], node_or_way)
if is_email(element):
return fix_email(element.attrib["v"], node_or_way)
if is_website(element):
return fix_website(element.attrib["v"], node_or_way)
if is_tiger(element):
return fix_tiger_no(element.attrib["v"], node_or_way)
if is_inscription(element):
return basic_fix(element.attrib['k'], element.attrib['v'], node_or_way)
return '$skip' # Ignore the elements not listed above
# ========================================================================= #
# Functions to print reports of data corrected or eliminated #
# ========================================================================= #
def print_dictionary(dic, text):
"""Helper function to print a dictionary formatted and returns None.
Arguments:
dic -- the dictionary to be formatted
text -- wording related to the dictionary
"""
print ('\n' + text + ' fixes:')
total = 0
for key, val in dic.items(): # Python 3 returns a view, NOT a list
value_list = list(val.items())
print (' ', key, ' => ', value_list[0][0], ' :', value_list[0][1])
total += value_list[0][1]
if text == 'City':
text = 'Citie'
if text == 'Tiger no':
text = "Tiger no'"
print (text + 's' + ' fixed:', total)
return
def print_detailed_fixes(counts):
"""Prints a detailed report of data corrections and eliminations, and returns None.
Arguments:
counts -- a dictionary of the number of data issues
"""
streets_issue_sort = sorted(streets_issue.items(), key=operator.itemgetter(1))
streets_issue_sort.reverse() # Note: In-place reversal
cities_issue_sort = sorted(cities_issue.items(), key=operator.itemgetter(1))
cities_issue_sort.reverse() # Note: In-place reversal
cities_problem_sort = sorted(cities_problem.items(), key=operator.itemgetter(1))
cities_problem_sort.reverse() # Note: In-place reversal
us_states_issue_sort = sorted(us_states_issue.items(), key=operator.itemgetter(1))
us_states_issue_sort.reverse() # Note: In-place reversal
us_states_problem_sort = sorted(us_states_problem.items(), key=operator.itemgetter(1))
us_states_problem_sort.reverse() # Note: In-place reversal
zipcodes_issue_sort = sorted(zipcodes_issue.items(), key=operator.itemgetter(1))
zipcodes_issue_sort.reverse() # Note: In-place reversal
phones_issue_sort = sorted(phones_issue.items(), key=operator.itemgetter(1))
phones_issue_sort.reverse() # Note: In-place reversal
emails_issue_sort = sorted(emails_issue.items(), key=operator.itemgetter(1))
emails_issue_sort.reverse() # Note: In-place reversal
websites_issue_sort = sorted(websites_issue.items(), key=operator.itemgetter(1))
websites_issue_sort.reverse() # Note: In-place reversal
bad_keys_sort = sorted(bad_keys.items(), key=operator.itemgetter(1))
bad_keys_sort.reverse() # Note: In-place reversal
print("\n---------------------------------------------------------")
print("CONSOLIDATED DETAILS OF DATA CORRECTIONS AND ELIMINATIONS")
print_dictionary(streets_dict, 'Street') # Street fixes
sum_val = sum(streets_issue.values())
print ('\nNumber of street issues: {:,}'.format(sum_val), 'streets removed from dataset')
print ("Street problems: ")
print ( *streets_issue_sort, sep = "\n" )
print_dictionary(cities_dict, 'City') # City fixes
sum_val = sum(cities_issue.values())
print ('\nNumber of cities not in NYC: {:,}'.format(sum_val))
print ("Cities outside NYC: ")
print ( *cities_issue_sort, sep = "\n" )
sum_val = sum(cities_problem.values())
print ('\nNumber of city problems: {:,}'.format(sum_val), 'cities removed from dataset')
print ("City problems: ")
print ( *cities_problem_sort, sep = "\n" )
print_dictionary(us_states_dict, 'State') # State fixes
sum_val = sum(us_states_issue.values())
print ('\nNumber of state issues: {:,}'.format(sum_val))
print ("States outside NY: ")
print ( *us_states_issue_sort, sep = "\n" )
sum_val = sum(us_states_problem.values())
print ('\nNumber of state problems: {:,}'.format(sum_val), 'states removed from dataset')
print ("State problems: ")
print ( *us_states_problem_sort, sep = "\n" )
print_dictionary(zipcodes_dict, 'Zip code') # Zip code fixes
sum_val = sum(zipcodes_issue.values())
print ('\nNumber of zipcode issues: {:,}'.format(sum_val), 'zipcodes removed from dataset')
print ("Zipcode problems: ")
print ( *zipcodes_issue_sort, sep = "\n" )
print_dictionary(phones_dict, 'Phone') # Phone fixes
sum_val = sum(phones_issue.values())
print ('\nNumber of phone number issues: {:,}'.format(sum_val), 'phone numbers removed from dataset')
print ("Phone problems: ")
print ( *phones_issue_sort, sep = "\n" )
sum_val = sum(emails_issue.values())
print ('\nNumber of email address issues: {:,}'.format(sum_val), 'emails removed from dataset')
print ("email problems: ")
print ( *emails_issue_sort, sep = "\n" )
sum_val = sum(websites_issue.values())
print ('\nNumber of website URL issues: {:,}'.format(sum_val), 'websites removed from dataset')
print ("Website problems: ")
print ( *websites_issue_sort, sep = "\n" )
print_dictionary(tiger_dict, 'Tiger no') # TIGER fixes
sum_val = sum(tiger_issue.values())
print ('\nNumber of TIGER issues: {:,}'.format(sum_val), 'TIGER tags removed from dataset')
print ('(TIGER is not [\'yes\', \'no\', \'aerial\'])')
print ("TIGER problems: ")
pp.pprint ( dict(tiger_issue) )
print_dictionary(house_dict, 'House number') # House number fixes
print_dictionary(cuisine_dict, 'Cuisine name') # Cuisine fixes
total = 0
for value in value_issue.values():
total += len(value)
print ('\nNumber of other value issues: ', total, 'tags removed from dataset')
print ("Other value problems: ")
pp.pprint ( dict(value_issue) )
n = counts['node child key eliminated'] + counts['way child key eliminated']
print ('\n<nodes><tags> and <ways><tags> corrupted keys: {:,}'.format(n), 'tags removed from dataset')
print ('Tag key problems:')
print ( *bad_keys_sort, sep = "\n" )
total = sum(node_id_bad.values())
print ('\nNumber of <nodes> eliminated: ', total, 'nodes removed from dataset')
print ("Nodes ID problems: ")
pp.pprint ( dict(node_id_bad) )
total = sum(way_id_bad.values())
print ('\nNumber of <ways> eliminated: ', total, 'ways removed from dataset')
print ("Ways ID problems: ")
pp.pprint ( dict(way_id_bad) )
total = sum(way_node_reference_bad.values())
print ('\nNumber of <ways nodes> eliminated: ', total, 'ways nodes removed from dataset')
print ("Ways nodes reference problems: ")
pp.pprint ( dict(way_node_reference_bad) )
return
The process for correcting or eliminating data values is explained above (fix_it_demo.py). What follows is a description of the surrounding methodology that is the data engine.
To review: the Open Street Map data is provided in an XML document format file. That file is read, the data parsed, cleaned, and stored into tabular data structures. It is then written into CSV format files. Later, the CSV file data are imported into an SQL database for data analysis.
XML is an inherently hierarchical data format, and the most natural way to represent it is with a tree. Python's ElementTree has two classes for this purpose -- ElementTree represents the whole XML document as a tree, and Element represents a single node in this tree. Interactions with the whole document (reading and writing to/from files) are usually done on the ElementTree level. Interactions with a single XML element and its sub-elements are done on the Element level.
The ElementTree wrapper type adds code to load XML files as trees of Element objects, and save them back again. The Element type is a simple but flexible container object, designed to store hierarchical data structures, such as simplified XML infosets, in memory.
Referring to the file main_process.py (see the second cell below), the get_element_tree function utilizes the ElementTree iterparse function to iteratively step through each top level XML element and read in sections of the XML file as a tree of the element, for example:
. . . -- element tree
<node> -- top level XML element
<tag ... /> -- children tags (sub-elements)
</node>
. . .
The element tree is returned as a generator and the memory space of the block is cleared. This allows huge files to be incrementally processed without saving the entire file in memory.
Referring to the file element_to_dictionary.py (see the cell below), the build_dictionary_element_tree function treats <node> element trees separately from <way> element trees. For the <node> elements, a dictionary of the <node> attributes is created. For each of the child <tag> tags, the tag value is sent to the fixer function (module fix_it.py) for correction or elimination. A list of the node tags is created.
Similarly for the <way> elements, a dictionary of the <way> attributes is created and the child <tag> values are sent to the fixer function. Lists of way tags and way-nodes are created.
The build_dictionary_element_tree function returns a dictionary of the attributes and tags for the <node> or <way> element tree.
Back to the file main_process.py (see the second cell below), the returned dictionaries are validated using the Cerberus library against the database schema. CSV files are created and each element and tag are written as a row in the appropriate CSV file.
The function takes as input an iterparse Element object (element tree) and returns a dictionary.
The function validates the element attribute 'id' is digits only. If the 'id' is null or not an integer, the element is eliminated from the dataset. The 'id' must be valid because it serves as the primary key in the SQL database.
For each tag, the key is validated to contain correct characters. The tags with keys that contain incorrect characters are removed from the dataset. This approach improves run-time performance -- there is no need to send tags with invalid keys to the data correction engine.
If the tag is a <way node>, the reference attribute 'ref' is validated to contain digits only. If the reference is null or not an integer, the tag is eliminated from the dataset. The 'ref' must be valid because it serves as a foreign key in the SQL database.
<node>:¶The dictionary returned has the format {"node": .., "node_tags": ...}
The "node" item is a key and its value holds a dictionary of the following top level node attributes (dictionary within a dictionary):
All other attributes are ignored.
The "node_tags" item is a key and its value holds a list of dictionaries, one per secondary tag. Secondary tags are child tags of the node which have the tag name/type: <tag>. Each dictionary has the following
fields from the secondary tag attributes:
Additionally,
if there are additional ":" in the "k" attribute they are ignored and kept as part of the tag key. For example:
<tag k="addr:street:name" v="Lincoln"/>
is turned into
{'id': 12345, 'key': 'street:name', 'value': 'Lincoln', 'type': 'addr'}
The final return value for a "node" element looks like:
{'node': {'id': 757860928,
'user': 'uboot',
'uid': 26299,
'version': '2',
'lat': 41.9747374,
'lon': -87.6920102,
'timestamp': '2010-07-22T16:16:51Z',
'changeset': 5288876},
'node_tags': [{'id': 757860928,
'key': 'amenity',
'value': 'fast_food',
'type': 'regular'},
{'id': 757860928,
'key': 'cuisine',
'value': 'sausage',
'type': 'regular'},
{'id': 757860928,
'key': 'name',
'value': "Shelly's Tasty Freeze",
'type': 'regular'}]}
<way>:¶The dictionary has the format {"way": ..., "way_tags": ..., "way_nodes": ...}
The "way" item is a key and its value holds a dictionary of the following top level way attributes (dictionary of dictionaries):
All other attributes are ignored.
The "way_tags" item is a key and its value holds a list of dictionaries, following the exact same rules as for "node_tags".
Additionally, the returned dictionary has an item "way_nodes". "way_nodes" is a key and its value holds a list of dictionaries, one for each "nd" child tag. Each dictionary has the fields:
<nd> tag<nd> tag i.e. what order the <nd> tag appears within the way elementThe final return value for a "way" element looks like:
{'way': {'id': 209809850,
'user': 'chicago-buildings',
'uid': 674454,
'version': '1',
'timestamp': '2013-03-13T15:58:04Z',
'changeset': 15353317},
'way_nodes': [{'id': 209809850, 'node_id': 2199822281, 'position': 0},
{'id': 209809850, 'node_id': 2199822390, 'position': 1},
{'id': 209809850, 'node_id': 2199822392, 'position': 2},
{'id': 209809850, 'node_id': 2199822369, 'position': 3},
{'id': 209809850, 'node_id': 2199822370, 'position': 4},
{'id': 209809850, 'node_id': 2199822284, 'position': 5},
{'id': 209809850, 'node_id': 2199822281, 'position': 6}],
'way_tags': [{'id': 209809850,
'key': 'housenumber',
'type': 'addr',
'value': '1412'},
{'id': 209809850,
'key': 'street',
'type': 'addr',
'value': 'West Lexington St.'},
{'id': 209809850,
'key': 'street:name',
'type': 'addr',
'value': 'Lexington'},
{'id': '209809850',
'key': 'street:prefix',
'type': 'addr',
'value': 'West'},
{'id': 209809850,
'key': 'street:type',
'type': 'addr',
'value': 'Street'},
{'id': 209809850,
'key': 'building',
'type': 'regular',
'value': 'yes'},
{'id': 209809850,
'key': 'levels',
'type': 'building',
'value': '1'},
{'id': 209809850,
'key': 'building_id',
'type': 'chicago',
'value': '366409'}]}
# Filename: element_to_dictionary.py
# Python 3.7
# Notes:
# This is a module of main_process.py
# Not to be run independently -- Use 'python main_process.py'
# Purpose: Convert the element to a dictionary
# Each element in the XML file is sent here to the "build_dictionary_element" function.
# The element is examined and for each child in the element, the child 'value' is sent
# to the "fix_it" function (in file "fix_it.py") for correcting.
# A Python dictionary is constructed and returned back to the "process_xml_elements" function
# (in file "main_process.py") for writing into the csv files.
import pprint
import re
import xml.etree.cElementTree as ET
#========================================================#
# Define regular expression and initialize lists #
#========================================================#
correct_chars_re = re.compile(r"^[a-zA-Z:\-_1-9]+$")
# Be sure the field order in the csv files matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
# ==================================================== #
# Helper Function #
# ==================================================== #
def check_id(id):
"""Helper function to check the ID, and returns it or None if the ID is corrupt.
Arguments:
id -- the ID number attribute of the element tags <node> or <way>
"""
id = id.strip()
if id and id.isdigit(): # id must only be a number
return id
else:
return None
# ================================================================= #
# Function to build a dictionary from the element tree #
# ================================================================= #
def build_dictionary_element_tree(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
default_tag_type='regular'):
"""Function takes an iterparse Element object (element tree) as an input and returns a dictionary.
Checks ID, key and reference
Sends children <tags> to the function fix_it for data value correction or elimination
Saves <node> or <way> XML element tree to a Python dictionary
Arguments:
element -- the current element tree in the XML file iteration
node_attr_fields -- list of attributes for <node> tag
way_attr_fields -- list of attributes for <way> tag
default_tag_type -- set to 'regular'
"""
node_attribs = {}
way_attribs = {}
way_nodes = []
tags = [] # Handle secondary tags the same way for both node and way elements
if element == None:
print ('Element is Null')
return None
if element.tag == 'node':
check = check_id(element.attrib['id'])
if not check:
print ('Node ID is Null or not a number: ', element.attrib['id'])
node_id_bad[element.attrib['id']] += 1
return None
for attr in element.attrib:
if attr in node_attr_fields:
node_attribs[attr] = element.attrib[attr]
for child in element:
temp = { }
if 'cityracks.' in child.attrib['k']:
child.attrib['k'] = child.attrib['k'].replace('cityracks.','')
m = correct_chars_re.search(child.attrib['k']) # No match returns None
if not m:
print ('Node key -- Problem character! ', 'key = ', child.attrib['k'], ' value = ', child.attrib['v'])
counts['node child key eliminated'] += 1
infoKey = 'node key: ' + child.attrib['k']
bad_keys[infoKey] += 1
continue # eliminate the problematic child tag
# Fix value
fixed = fixer(child, 'Node') # Correct or eliminate the child <tag> value
# Function fix_it returns None if there is a data problem
if fixed == '$skip':
counts['node tag skipped'] += 1
continue
if not fixed:
counts['node child value eliminated'] += 1
continue # Eliminate this child tag
else:
temp['id'] = element.attrib['id'] # Save the fixed child tag for writing into csv file
temp['value'] = fixed
if ':' in child.attrib['k']:
k = child.attrib['k'].split(':',1)
temp['type'] = k[0]
temp['key'] = k[1]
else:
temp['key'] = child.attrib['k']
temp['type'] = default_tag_type
counts['node tag count'] += 1 # count the child tags not eliminated
tags.append(temp)
return {'node': node_attribs, 'node_tags': tags}
elif element.tag == 'way':
check = check_id(element.attrib['id'])
if not check:
print ('Way ID is Null or not a number: ', element.attrib['id'])
way_id_bad[element.attrib['id']] += 1
return None
for attr in element.attrib:
if attr in way_attr_fields:
way_attribs[attr] = element.attrib[attr]
position = 0
for child in element:
temp = { }
if child.tag == 'tag':
m = correct_chars_re.search(child.attrib['k']) # No match returns None
if not m:
print ('Way key -- Problem char! ', 'key = ', child.attrib['k'], ' value = ', child.attrib['v'])
counts['way child key eliminated'] += 1
infoKey = 'way key: ' + child.attrib['k']
bad_keys[infoKey] += 1
continue # eliminate the problematic child tag
# Fix value
fixed = fixer(child, 'Way') # Correct or eliminate the child <tag> value
# Function fix_it returns None if there is a data problem
if fixed == '$skip':
counts['way tag skipped'] += 1
continue
if not fixed:
counts['way child value eliminated'] += 1
continue # Eliminate this child tag
else:
temp['id'] = element.attrib['id'] # Save the fixed child tag for writing into csv file
temp['value'] = fixed
if ':' in child.attrib['k']:
k = child.attrib['k'].split(':',1)
temp['type'] = k[0]
temp['key'] = k[1]
else:
temp['key'] = child.attrib['k']
temp['type'] = default_tag_type
counts['way tag count'] += 1 # count the child tags not eliminated
tags.append(temp)
elif child.tag == 'nd':
check = check_id(child.attrib['ref'])
if not check:
print ('Way Node reference is Null or not a number: ', child.attrib['ref'])
way_node_reference_bad[child.attrib['ref']] += 1
continue
temp['id'] = element.attrib['id']
temp['node_id'] = child.attrib['ref']
temp['position'] = position
position += 1
counts['way node tag count'] += 1 # count the child tags not eliminated
way_nodes.append(temp)
return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
# Filename: main_process.py
# Python 3.7
# Purpose: Validate and save the element dictionary to csv files
# Send each element in the XML file to the "build_dictionary_element" function (in file "element_to_dictionary.py")
# Validate the returned element dictionary against the schema and save it to csv files where the csv data can
# be imported to an SQL database for data analysis.
# The element is examined and for each child in the element, the child 'value' is sent to
# the "fix_it" function (in file "fix_it.py") for correcting.
# A Python dictionary is constructed and returned back to the "process_xml_elements" function
# (in file "main_process.py") for saving into the csv file.
import csv
import codecs
import pprint
import re
from collections import defaultdict
import xml.etree.cElementTree as ET
import cerberus
import sys
#==========================#
# Import .py files #
#==========================#
import db_schema
#===============================#
# Initialize file names #
#===============================#
OSM_PATH = "UpperWestSideFull.osm" # File size is 78.2MB
SCHEMA = db_schema.schema
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"
#=========================================#
# Construct and initialize lists #
#=========================================#
# Be sure the field order in the csv matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']
# =============================================================== #
# Main Process Helper Functions #
# =============================================================== #
def get_element_tree(osm_file, tags=('node', 'way')):
"""Parse XML file data and yield an element tree.
Iteratively step through each top level XML element <node> or <way>
and read in sections of the XML file as a tree of the element
Arguments:
osm_file -- the Open Street Map XML file to process
tags -- list of XML parent tags to process
"""
context = ET.iterparse(osm_file, events=('start', 'end'))
# iterparse returns a stream of events between start and end.
# Returns an iterator providing (event, element) pairs
# parses an XML section into an element tree incrementally
_, root = next(context)
# root saves a reference to the iterator (block of XML) currently in process
# next() returns the next item in an iterator. Do not use context.next() in Python 3
# _ is a dummy variable for event. We are only interested in the root reference
for event, element in context: # the result is an iterable that returns a stream of (event, element) tuples
if event == 'end': # end returns the fully populated element (including children)
counts['element count'] += 1
if element.tag in tags:
yield element # yield returns a generator
root.clear() # remove the XML section from memory
else:
counts['not a node or way count'] += 1
del context
return
# Raise Validation Error if dictionary does not match schema
def validate_dictionary(dict, validator, schema=SCHEMA):
"""Runs the Cerberus data validation library to validate the dictionary against the schema.
Raises an exception for a validation error, or returns None if dictionary is valid
Arguments:
dict -- the element tree dictionary for the current element tree in the XML file iteration
validator -- Cerberus
schema -- the SQL database schema in Python format
"""
if validator.validate(dict, schema) is not True:
print ('Validation ERROR!')
print ('Dictionary = ', dict)
print ('\nSchema = ', schema)
print ('\n-------------\n')
for field, errors in validator.errors.items():
message_string = "\nElement of type '{0}' has the following errors:\n{1}"
error_string = pprint.pformat(errors)
raise Exception(message_string.format(field, error_string))
return
# ================================================== #
# Main Function #
# ================================================== #
def process_xml_elements(file_in, validate):
"""Iteratively process each XML element tree, build dictionary, validate, and write to CSV files.
Aborts execution if a problem occurs or returns None if successful
Steps through each element in the tree, and calls the function build_dictionary_element_tree
to assemble the dictionary
Sends the dictionary to the validator
Writes out the dictionary as rows in a CSV file
Prints a report and returns if successful or aborts if problem occurs
Arguments:
file_in -- the Open Street Map XML file to process
validate -- boolean switch to turn on or off validation
"""
response = initialize()
if not response:
print ('Fatal Error initializing dictionaries')
print ('\nTerminating execution...')
return None
with open(NODES_PATH, 'w') as nodes_file, \
open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
open(WAYS_PATH, 'w') as ways_file, \
open(WAY_NODES_PATH, 'w') as way_nodes_file, \
open(WAY_TAGS_PATH, 'w') as way_tags_file:
nodes_writer = csv.DictWriter(nodes_file, fieldnames = NODE_FIELDS)
node_tags_writer = csv.DictWriter(nodes_tags_file, fieldnames = NODE_TAGS_FIELDS)
ways_writer = csv.DictWriter(ways_file, fieldnames = WAY_FIELDS)
way_nodes_writer = csv.DictWriter(way_nodes_file, fieldnames = WAY_NODES_FIELDS)
way_tags_writer = csv.DictWriter(way_tags_file, fieldnames = WAY_TAGS_FIELDS)
nodes_writer.writeheader()
node_tags_writer.writeheader()
ways_writer.writeheader()
way_nodes_writer.writeheader()
way_tags_writer.writeheader()
validator = cerberus.Validator()
print ("\nDATA ELIMINATIONS\n")
for element_tree in get_element_tree(file_in, tags=('node', 'way')):
dict = build_dictionary_element_tree(element_tree)
if dict: # returns False if dict is equal to '0', None', '', False, or empty structure
counts['node way count'] += 1
if validate is True:
validate_dictionary(dict, validator, schema=SCHEMA)
if element_tree.tag == 'node':
counts['node count'] += 1
nodes_writer.writerow(dict['node'])
node_tags_writer.writerows(dict['node_tags'])
elif element_tree.tag == 'way':
counts['way count'] += 1
ways_writer.writerow(dict['way'])
way_nodes_writer.writerows(dict['way_nodes'])
way_tags_writer.writerows(dict['way_tags'])
else:
print (' -- Dictionary returned is: ', dict)
print_summary()
print_detailed_fixes(counts)
print ()
if validate is True:
print ('Validation... Passed')
print ('\nCSV files created')
return
# ================================================================================= #
# Function to print a report of the process #
# ================================================================================= #
def print_summary():
"""Prints a summary report of data corrections and eliminations, and returns None."""
print ('\n-------')
print ('SUMMARY')
print ("\nTag counts:")
print (" Nodes: {:,}".format(counts['node count']) )
print (" Ways: {:,}".format(counts['way count']) )
print (" Nodes tags: {:,}".format(counts['node tag count']) )
print (" Ways tags: {:,}".format(counts['way tag count']) )
print (" Ways Nodes: {:,}".format(counts['way node tag count']) )
print ('\nEliminated tag counts:')
print (" Tags with bad data values")
print (' Nodes tags voided: {:,}'.format(counts['node child value eliminated']))
print (' Ways tags voided: {:,}'.format(counts['way child value eliminated']))
print ('\n Tags with corrupt ID')
total = sum(node_id_bad.values())
print (' Nodes removed: {:,}'.format(total))
total = sum(way_id_bad.values())
print (' Ways removed: {:,}'.format(total))
print ("\n Tags with corrupt keys")
print (' Nodes tags with key problem: {:,}'.format(counts['node child key eliminated']))
print (' Ways tags with key problem: {:,}'.format(counts['way child key eliminated']))
print ("\n Tags with defective reference")
total = sum(way_node_reference_bad.values())
print (' Ways Nodes tags voided: {:,}'.format(total))
print ("\nSkipped tag counts")
print (' Node child tags skipped: {:,}'.format(counts['node tag skipped']))
print (' Way child tags skipped: {:,}'.format(counts['way tag skipped']))
total = counts['node tag skipped'] + counts['way tag skipped']
print (' Total tags skipped: {:,}'.format(total))
print ( "\nTotal non-node or non-way count: {:,}".format(counts['not a node or way count']) )
print ( "\nTotal elements processed: {:,}".format(counts['element count']) )
return
#========================#
# Runner #
#========================#
if __name__ == '__main__':
# Note: Validation is ~ 10X slower
# Change validate to validate = True to turn on validation
process_xml_elements(OSM_PATH, validate = False) ### CHANGE to True to Validate
The summary report for the Test dataset is provided below for readers wishing to run the enclosed Python files.

Referring to main_process.py above, during the parsing process, the XML tag data for each XML element tree, is sanitized and stored into a Python dictionary. This dictionary, and the Python schema, are sent to the Cerberus validator to confirm the dictionary created represents the desired database schema. Only after confirmation, is the data written to the CSV file.
The original data from the Open Street Map XML file is parsed, corrected where possible or eliminated (data cleaning), and saved into CSV files for importing into an SQL database.
First, checks are performed to validate the cleaning process. The number of records in the CSV files are compared against the tag counts output by the Python processing. If the counts match, this validates the CSV files contain only the correct data with the dirty data removed.
The function csv_row_count() in the cell below, performs the check.
Python code:
xml_csv_validation_routines.py
# Filename: xml_csv_validation_routines.py
# Python 3.7
# Check the number of rows in csv files
import csv
from collections import defaultdict
import os
import sys
csv_counts = defaultdict(int)
def csv_row_count():
"""Prints the number of rows in the CSV files and returns None."""
if not os.path.exists("nodes_tags.csv"):
print ("Cannot find CSV files...")
sys.exit()
print ('\nCSV FILE RECORD COUNTS\n')
with open('nodes.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
csv_counts['nodes_row_count'] = sum(1 for row in reader)
print ('Node number of rows: {:,}'.format(csv_counts['nodes_row_count'] - 1)) # Subtract header row
csv_file.close()
with open('nodes_tags.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
csv_counts['nodes_tags_row_count'] = sum(1 for row in reader)
print ('Node tags number of rows: {:,}'.format(csv_counts['nodes_tags_row_count'] - 1)) # Subtract header row
csv_file.close()
with open('ways.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
csv_counts['ways_row_count'] = sum(1 for row in reader)
print ('\nWay number of rows: {:,}'.format(csv_counts['ways_row_count'] - 1)) # Subtract header row
csv_file.close()
with open('ways_tags.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
csv_counts['ways_tags_row_count'] = sum(1 for row in reader)
print ('Way tags number of rows: {:,}'.format(csv_counts['ways_tags_row_count'] - 1)) # Subtract header row
csv_file.close()
with open('ways_nodes.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
csv_counts['ways_nodes_row_count'] = sum(1 for row in reader)
print ('Way Node number of rows: {:,}'.format(csv_counts['ways_nodes_row_count'] - 1)) # Subtract header row
csv_file.close()
return
csv_row_count()
The number of different tags in the original XML file are counted. These numbers are compared against the number of records in the CSV files. The expectation is the number of records in the CSV files is less than the number in the XML file due to elimination of dirty data. Further, when the number of bad data is tallied, the counts between the XML and CSV files are expected to reconcile.
The function count_xml_tags() in the cell below, performs the XML check.
Python code:
xml_csv_validation_routines.py
# Filename: xml_csv_validation_routines.py
# Python 3.7
# Count all the tags in the XML file
# Reconcile check for number of records in csv files
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
import pprint
map_file = 'UpperWestSideFull.osm'
tags = defaultdict(int)
children = defaultdict(int)
problem_counts = defaultdict(int)
valid_keys = [ "addr:housenumber",
"amenity",
"name",
"cuisine",
"shop",
"building",
"addr:street",
"addr:state",
"addr:city",
"addr:postcode",
"phone",
"email",
"website",
"url",
"tiger:reviewed",
"inscription_1",
"inscription_2",
"inscription_date",
"nrhp:inscription_date"]
correct_chars_re = re.compile(r"^[a-zA-Z:\-_1-9]+$")
def initialize():
"""Clears the dictionaries and returns a boolean."""
try:
tags.clear()
children.clear()
problem_counts.clear()
return True
except:
return None
def check_id(id):
"""Checks the ID is valid and returns it or None if it is invalid.
Arguments:
id -- the ID number attribute of the element tags <node> or <way>
"""
id = id.strip()
if id and id.isdigit(): # id must only be a number
return id
else:
return None
def separator(dict):
"""Formats values in a dictionary with the thousands separator and returns the dictionary.
Arguments:
dict -- the dictionary to be formatted
"""
for k,v in dict.items():
v = "{:,}".format(v)
dict[k] = str(v)
return dict
def element_tree(osm_file):
"""Parse XML file data and yield an element tree.
Iteratively step through each top level XML element and read in sections of the XML file
as a tree of the element
Arguments:
osm_file -- the Open Street Map XML file to process
"""
context = ET.iterparse(osm_file, events=('start', 'end'))
_, root = next(context) # root saves a reference to the iterator (block of XML) currently in process
for event, element in context: # the result is an iterable that returns a stream of (event, element) tuples
if event == 'end': # end returns the fully populated element (including children)
yield element # yield returns a generator
root.clear()
del context
return
def count_xml_tags(filename):
"""Counts all the tags in the XML file and returns None.
Reconcile check for the number of records in the CSV files
Arguments:
filename -- the Open Street Map XML file to process
"""
for element in element_tree(filename):
bad_id = False
tags[element.tag] += 1
if element.tag == 'node':
check = check_id(element.attrib['id'])
if not check:
print ('Node ID is Null or not a number: ', element.attrib['id'])
problem_counts['node id bad'] += 1
bad_id = True # No continue here because counting ALL tags
if element.tag == 'way':
check = check_id(element.attrib['id'])
if not check:
print ('Way ID is Null or not a number: ', element.attrib['id'])
problem_counts['way id bad'] += 1
bad_id = True # No continue here because counting ALL tags
for child in element:
if child.tag == 'nd': # Check the 'nd' way node element for ID
if bad_id:
problem_counts['nd bad'] += 1
print ('Way node ID is bad: ', element.attrib['id'])
else:
check = check_id(child.attrib['ref']) # Check the 'nd' way node element for reference ID
if not check:
problem_counts['nd bad'] += 1
print ('Way node reference is bad: ', child.attrib['ref'])
if child.tag == 'tag' and (child.attrib['k'] in valid_keys) and bad_id: # Valid keys with bad ID
if element.tag == 'node':
print (' ', element.tag.capitalize(), ': k = ', child.attrib['k'], ' id = ', element.attrib['id'], ' Problem: Corrupt ID')
problem_counts['node tag bad'] += 1
if element.tag == 'way':
print (' ', element.tag.capitalize(), ': k = ', child.attrib['k'], ' v = ', child.attrib['v'],' id = ', element.attrib['id'], ' Problem: Corrupt ID')
problem_counts['way tag bad'] += 1
if child.tag == 'tag' and (element.tag in ['node', 'way']) and not bad_id: #Note: Bad keys with good ID
m = not correct_chars_re.search(child.attrib['k']) # Check for corrupt keys
if m and not ('cityracks' in child.attrib['k']):
if element.tag == 'node':
print (' ', element.tag.capitalize(), ': k = ', child.attrib['k'], ' id = ', element.attrib['id'], ' Problem: Corrupt key')
problem_counts['node key bad'] += 1
if element.tag == 'way':
print (' ', element.tag.capitalize(), ': k = ', child.attrib['k'], ' id = ', element.attrib['id'], ' Problem: Corrupt key')
problem_counts['way key bad'] += 1
try:
child_key = child.attrib['k']
except:
child_key = None
continue
if (element.tag in ['node', 'way']) and (child_key in valid_keys):
children[element.tag + ' ' + child_key] += 1
children['Total child tags'] += 1
if element.tag == 'node':
children['Total node tags'] += 1
else:
children['Total way tags'] += 1
return
def count_all_tags():
"""Calls the count routines, prints a report, and returns None."""
response = initialize()
if not response:
print ("Cannot perform initialization...")
print ("...program execution terminated")
return None
count_xml_tags(map_file)
total = sum(tags.values())
separator(tags)
separator(children)
print ("\n-------------------")
print ("XML FILE TAG COUNTS\n")
print (" Nodes: ", tags['node'])
print (" Ways: ", tags['way'])
print (" Ways Nodes: ", tags['nd'], "\n")
print (" Nodes tags: ", children['Total node tags'])
print (" Ways tags: ", children['Total way tags'], "\n")
print ("Total sum of XML tags processed: {:,}".format(total) )
print ("\n----------------")
print ("XML PROBLEM TAGS\n")
print ("Node ID problem: ", problem_counts['node id bad'])
print ("Way ID problem: ", problem_counts['way id bad'])
print ("Way Node problem: ", problem_counts['nd bad'])
print ("Node tag problem: ", problem_counts['node tag bad'])
print ("Way tag problem: ", problem_counts['way tag bad'])
print ("\nNode tag key problem: ", problem_counts['node key bad'])
print ("Way tag key problem: ", problem_counts['way key bad'])
print ("\n------------")
print ("ELEMENT TAGS\n")
pprint.pprint(tags)
print ("\n----------")
print ("CHILD TAGS\n")
pprint.pprint(children)
return
if __name__ == "__main__":
count_all_tags()
The table below summarizes the comparison. The table confirms the count differences in the files equals the number of bad data eliminated.
This validates that the problem data is eliminated from the CSV files -- which are used to import the data into the database. Examine the XML tag data and reconcile with the CSV record data. The difference between them is expected to equal the number of problem data discarded.
The function make_table() below creates the reconciled table.
Python code:
xml_csv_validation_routines.py
# Filename: xml_csv_validation_routines.py
# Python 3.7
# Print a table of the reconciled CSV and XML count differences
from prettytable import PrettyTable
def make_table():
"""Prints a table of the reconciled CSV and XML count differences and returns None."""
pt = PrettyTable()
pt.field_names = [" ", " XML Tag Count ", " Eliminated XML Tags ", " CSV Record Count ",
" Difference is Tags Eliminated "]
pt.add_row([" ", " ", " ", " ", " "])
pt.add_row([" <node> <tag> ", children['Total node tags'], problem_counts['node tag bad'], "{:,}".format(csv_counts['nodes_tags_row_count'] - 1),
(int(children['Total node tags'].replace(',', '')) - (csv_counts['nodes_tags_row_count'] - 1) - problem_counts['node tag bad'])])
pt.add_row([" ", " ", " ", " ", " "])
pt.add_row([" <way> <tag> ", children['Total way tags'], problem_counts['way tag bad'], "{:,}".format(csv_counts['ways_tags_row_count'] - 1),
(int(children['Total way tags'].replace(',', '')) - (csv_counts['ways_tags_row_count'] - 1) - problem_counts['way tag bad'])])
pt.add_row([" ", " ", " ", " ", " "])
pt.add_row([" <node> ", tags['node'], problem_counts['node id bad'], "{:,}".format(csv_counts['nodes_row_count'] - 1),
(int(tags['node'].replace(',', '')) - (csv_counts['nodes_row_count'] - 1) - problem_counts['node id bad'])])
pt.add_row([" ", " ", " ", " ", " "])
pt.add_row([" <way> ", tags['way'], problem_counts['way id bad'], "{:,}".format(csv_counts['ways_row_count'] - 1),
(int(tags['way'].replace(',', '')) - (csv_counts['ways_row_count'] - 1) - problem_counts['way id bad'])])
pt.add_row([" ", " ", " ", " ", " "])
pt.add_row([" <way node> ", tags['nd'], problem_counts['nd bad'], "{:,}".format(csv_counts['ways_nodes_row_count'] - 1),
(int(tags['nd'].replace(',', '')) - (csv_counts['ways_nodes_row_count'] - 1) - problem_counts['nd bad'])])
print ("\n TABLE OF RECORD COUNT DIFFERENCES\n")
print(pt)
return
make_table()
The data in the CSV files are imported into an SQLite database. The database and tables are created according to the data_wrangling_schema.sql schema specification. The 'id' fields in the tables are used as the database reference keys.
# Filename: data_wrangling_schema.sql
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY NOT NULL,
lat REAL,
lon REAL,
user TEXT,
uid INTEGER,
version INTEGER,
changeset INTEGER,
timestamp TEXT
);
CREATE TABLE IF NOT EXISTS nodes_tags (
id INTEGER,
key TEXT,
value TEXT,
type TEXT,
FOREIGN KEY (id) REFERENCES nodes(id)
);
CREATE TABLE IF NOT EXISTS ways (
id INTEGER PRIMARY KEY NOT NULL,
user TEXT,
uid INTEGER,
version TEXT,
changeset INTEGER,
timestamp TEXT
);
CREATE TABLE IF NOT EXISTS ways_tags (
id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
type TEXT,
FOREIGN KEY (id) REFERENCES ways(id)
);
CREATE TABLE IF NOT EXISTS ways_nodes (
id INTEGER NOT NULL,
node_id INTEGER NOT NULL,
position INTEGER NOT NULL,
FOREIGN KEY (id) REFERENCES ways(id),
FOREIGN KEY (node_id) REFERENCES nodes(id)
);
For Python processing purposes, the SQL schema is translated to a Python dictionary structure -- see the db_schema.py module in the cell below.
# Filename: db_schema.py
# Python 3.7
# Note: The Cerberus library is used to validate the Python dictionary created, against the database schema.
# The Cerberus validator requires the schema is stored in a .py file
# The Cerberus library provides int() and float() type coercion functions to convert data before validation
# It is an additional check that if the values are not the type specified in the schema, they are
# converted if possible, or an error is flagged.
schema = {
'node': {
'type': 'dict',
'schema': {
'id': {'required': True, 'type': 'integer', 'coerce': int},
'lat': {'required': True, 'type': 'float', 'coerce': float},
'lon': {'required': True, 'type': 'float', 'coerce': float},
'user': {'required': True, 'type': 'string'},
'uid': {'required': True, 'type': 'integer', 'coerce': int},
'version': {'required': True, 'type': 'string'},
'changeset': {'required': True, 'type': 'integer', 'coerce': int},
'timestamp': {'required': True, 'type': 'string'}
}
},
'node_tags': {
'type': 'list',
'schema': {
'type': 'dict',
'schema': {
'id': {'required': True, 'type': 'integer', 'coerce': int},
'key': {'required': True, 'type': 'string'},
'value': {'required': True, 'type': 'string'},
'type': {'required': True, 'type': 'string'}
}
}
},
'way': {
'type': 'dict',
'schema': {
'id': {'required': True, 'type': 'integer', 'coerce': int},
'user': {'required': True, 'type': 'string'},
'uid': {'required': True, 'type': 'integer', 'coerce': int},
'version': {'required': True, 'type': 'string'},
'changeset': {'required': True, 'type': 'integer', 'coerce': int},
'timestamp': {'required': True, 'type': 'string'}
}
},
'way_nodes': {
'type': 'list',
'schema': {
'type': 'dict',
'schema': {
'id': {'required': True, 'type': 'integer', 'coerce': int},
'node_id': {'required': True, 'type': 'integer', 'coerce': int},
'position': {'required': True, 'type': 'integer', 'coerce': int}
}
}
},
'way_tags': {
'type': 'list',
'schema': {
'type': 'dict',
'schema': {
'id': {'required': True, 'type': 'integer', 'coerce': int},
'key': {'required': True, 'type': 'string'},
'value': {'required': True, 'type': 'string'},
'type': {'required': True, 'type': 'string'}
}
}
}
}
The SQLite database is created and the tables are generated as per the schema. This is performed programmatically in Python -- see the function create_database() in the database_routines.py code in the cell below.
Python code:
database_routines.py
# Filename: database_routines.py
# Python 3.7
import sqlite3 as sql
import csv
import os
import sys
#------------------------------------#
# Create Sqlite3 database and tables #
#------------------------------------#
def create_database():
"""Creates SQLite database and tables, and returns None."""
if os.path.exists("data_wrangling_project.db"):
print ("Database already exists...")
os.remove("data_wrangling_project.db")
print ("...database deleted")
try:
connection = sql.connect("data_wrangling_project.db")
print ("\nDatabase created...")
except:
print ("Error -- cannot connect to the database")
sys.exit()
with connection:
cur = connection.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS nodes ( \
id INTEGER PRIMARY KEY NOT NULL, \
lat REAL, \
lon REAL, \
user TEXT, \
uid INTEGER, \
version INTEGER, \
changeset INTEGER, \
timestamp TEXT \
);")
cur.execute("CREATE TABLE IF NOT EXISTS nodes_tags ( \
id INTEGER NOT NULL, \
key TEXT, \
value TEXT, \
type TEXT, \
FOREIGN KEY (id) REFERENCES nodes(id) \
);")
cur.execute("CREATE TABLE IF NOT EXISTS ways ( \
id INTEGER PRIMARY KEY NOT NULL, \
user TEXT, \
uid INTEGER, \
version TEXT, \
changeset INTEGER, \
timestamp TEXT \
);")
cur.execute("CREATE TABLE IF NOT EXISTS ways_tags ( \
id INTEGER NOT NULL, \
key TEXT, \
value TEXT, \
type TEXT, \
FOREIGN KEY (id) REFERENCES ways(id) \
);")
cur.execute("CREATE TABLE IF NOT EXISTS ways_nodes ( \
id INTEGER NOT NULL, \
node_id INTEGER NOT NULL, \
position INTEGER NOT NULL, \
FOREIGN KEY (id) REFERENCES ways(id), \
FOREIGN KEY (node_id) REFERENCES nodes(id) \
);")
connection.commit()
cur.close()
connection.close()
print ("...database tables created")
return
create_database()
Data in the CSV files are imported into the database tables. The names of the columns are specified and the abort statement is applied to flag data insertion errors. The number of records imported to each table is recorded to double-check the counts are consistent.
If an applicable constraint violation occurs, the ABORT resolution algorithm aborts the current SQL statement with an SQLITE_CONSTRAINT error and backs out any changes made by the current SQL statement; but changes caused by prior SQL statements within the same transaction are preserved and the transaction remains active.
See the function read_csv_files() in the cell below.
Python code:
database_routines.py
# Filename: database_routines.py
# Python 3.7
import sqlite3 as sql
import csv
import sys
import os
#----------------------------------------------------#
# Read CSV files and write data into database tables #
#----------------------------------------------------#
def read_csv_files():
"""Reads CSV files and writes data into database tables, and returns None."""
if os.path.exists("data_wrangling_project.db"):
print ("\nDatabase in order...")
else:
print ("\nDatabase does not exist...\n")
sys.exit()
if not os.path.exists("nodes_tags.csv"):
print ("Cannot find CSV files...")
sys.exit()
try:
con = sql.connect("data_wrangling_project.db")
print ("Connected to database...\n")
except:
print ("\nError -- cannot connect to the database")
sys.exit()
cur = con.cursor()
nodes_row_count = 0
nodes_tags_row_count = 0
ways_row_count = 0
ways_tags_row_count = 0
ways_nodes_row_count = 0
with open('nodes.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
next(csv_file) # skip header row
for row in reader:
cur.execute("INSERT OR ABORT INTO nodes (id, lat, lon, user, uid, version, changeset, timestamp) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?);", row)
nodes_row_count += 1
print ('Nodes written to db...')
print ('Nodes number of rows: {:,}'.format(nodes_row_count))
csv_file.close()
with open('nodes_tags.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
next(csv_file) # skip header row
for row in reader:
cur.execute("INSERT OR ABORT INTO nodes_tags (id, key, value, type) VALUES (?, ?, ?, ?);", row)
nodes_tags_row_count += 1
print ('\nNodes Tags written to db...')
print ('Nodes Tags number of rows: {:,}'.format(nodes_tags_row_count))
csv_file.close()
with open('ways.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
next(csv_file) # skip header row
for row in reader:
cur.execute("INSERT OR ABORT INTO ways (id, user, uid, version, changeset, timestamp) \
VALUES (?, ?, ?, ?, ?, ?);", row)
ways_row_count += 1
print ('\nWays written to db...')
print ('Ways number of rows: {:,}'.format(ways_row_count))
csv_file.close()
with open('ways_tags.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
next(csv_file) # skip header row
for row in reader:
cur.execute("INSERT OR ABORT INTO ways_tags (id, key, value, type) VALUES (?, ?, ?, ?);", row)
ways_tags_row_count += 1
print ('\nWays Tags written to db...')
print ('Ways Tags number of rows: {:,}'.format(ways_tags_row_count))
csv_file.close()
with open('ways_nodes.csv', 'r') as csv_file:
reader = csv.reader(csv_file) # comma is default delimiter
next(csv_file) # skip header row
for row in reader:
cur.execute("INSERT OR ABORT INTO ways_nodes (id, node_id, position) VALUES (?, ?, ?);", row)
ways_nodes_row_count += 1
print ('\nWays Nodes written to db...')
print ('Ways Nodes number of rows: {:,}'.format(ways_nodes_row_count))
csv_file.close()
con.commit()
cur.close()
con.close()
return
read_csv_files()
The number of rows in each database table is counted. These numbers are compared with the number of rows in the CSV files and the number of correct data output by the program main_process.py.
Refer to the function count_rows() in the cell below.
Python code:
database_routines.py
# Filename: database_routines.py
# Python 3.7
import sqlite3 as sql
import sys
import os
#----------------------------------------#
# Count the number of rows in each table #
#----------------------------------------#
def count_rows():
"""Counts the number of rows in each database table, prints a report, and returns None."""
if not os.path.exists("data_wrangling_project.db"):
print ("\nDatabase does not exist...\n")
sys.exit()
try:
db = sql.connect("data_wrangling_project.db")
except:
print ("\nError -- cannot connect to the database")
sys.exit()
c = db.cursor()
query = "select count(*) as num from nodes;"
c.execute(query)
rowsn = c.fetchall()
query = "select count(*) as num from nodes_tags;"
c.execute(query)
rowsnt = c.fetchall()
query = "select count(*) as num from ways;"
c.execute(query)
rowsw = c.fetchall()
query = "select count(*) as num from ways_tags;"
c.execute(query)
rowswt = c.fetchall()
query = "select count(*) as num from ways_nodes;"
c.execute(query)
rowswn = c.fetchall()
print ("\nCount the number of rows in each table:\n")
print ('Nodes: {:,}'.format(*rowsn[0])) # * before tuple unpacks the tuple into separate arguments
print ('Nodes Tags: {:,}'.format(*rowsnt[0]))
print ('Ways: {:,}'.format(*rowsw[0]))
print ('Ways Tags: {:,}'.format(*rowswt[0]))
print('Ways Nodes: {:,}'.format(*rowswn[0]))
c.close()
db.close()
return
count_rows()
The above check validates the amount of data loaded into the database tables are correct.
This project requires information from both the <nodes> and the <ways> tables because data of interest is located in both. A <node> is a point on the map and <ways> are paths connecting the nodes.
The strategy is to break complicated database queries into smaller pieces that are simpler and more reliable to execute. First, consolidate the required data into new tables, and then execute the retrieval queries on the new tables.
Create a new table to consolidate <node> <tags> and <way> <tags>, and similarly another for <nodes> and <ways>. Note that latitude and longitude data only occur in <nodes>. ID, user, and timestamp data is included in both the <nodes> and <ways> tables. The NULL statement is used as a placeholder for the latitude and longitude columns in the <ways> table.
See the function consolidated_tables() in the code below.
Python code:
database_routines.py
# Filename: database_routines.py
# Python 3.7
import sqlite3 as sql
import sys
import os
#---------------------------------------#
# Create consolidated database tables #
#---------------------------------------#
def consolidated_tables():
"""Creates consolidated database tables and returns None."""
if not os.path.exists("data_wrangling_project.db"):
print ("\nDatabase does not exist...\n")
sys.exit()
try:
dbConnect = sql.connect("data_wrangling_project.db")
print ("\nConnected to database...")
except:
print ("\nError -- cannot connect to the database")
sys.exit()
cur = dbConnect.cursor()
# Clean up database
cur.execute("""DROP TABLE IF EXISTS union_all_tags;""")
cur.execute("""DROP TABLE IF EXISTS nodes_union_ways;""")
dbConnect.commit()
# Create a new table to consolidate tag data
cur.execute("""CREATE TABLE IF NOT EXISTS union_all_tags (
id INTEGER NOT NULL,
key TEXT,
value TEXT,
type TEXT
);""")
dbConnect.commit()
cur.execute("""INSERT OR ABORT INTO union_all_tags
SELECT id, key, value, type FROM nodes_tags
UNION ALL
SELECT id, key, value, type FROM ways_tags
;""")
dbConnect.commit()
print ("\nConsolidated tag table done...")
# Create a new table to consolidate user and timestamp data
cur.execute("""CREATE TABLE IF NOT EXISTS nodes_union_ways (
id INTEGER NOT NULL,
user TEXT,
timestamp TEXT,
lat REAL,
lon REAL
);""")
dbConnect.commit()
cur.execute("""INSERT OR ABORT INTO nodes_union_ways
SELECT id, user, timestamp, lat, lon FROM nodes
UNION
SELECT id, user, timestamp, NULL as lat, NULL as lon FROM ways
;""")
dbConnect.commit()
print ("\nConsolidated user table done\n")
cur.close()
dbConnect.close()
return
consolidated_tables()
SQL queries are run to provide a sense of the data coverage and quality.
Python code:
database_queries.py
# Filename: database_queries.py
# Python 3.7
import sqlite3 as sql
import sys
import os
#-----------------------------#
# Table print functions #
#-----------------------------#
def print_rows_3_cols(rows, title, comment, col1, col2, col3):
"""Prints a table with 3 columns and returns None.
Arguments:
rows -- the list of rows returned from the SQL query
title -- the title of the printed table
comment -- sub-title of the printed table
col1, col2, col3 -- the names of the columns in the table
"""
print ("\n" + title + "\n" + "-"*len(title))
print ("\n" + comment + "\n")
# Prefix the size requirement with '-' to left justify
sys.stdout.write("%-30s %-20s %-60s\n" % (col1, col2, col3))
sys.stdout.write("%-30s %-20s %-60s\n" % ("-"*len(col1), "-"*len(col2), "-"*len(col3)))
if col3 != '':
for row in rows:
sys.stdout.write("%-30s %-20s %-60s\n" % (row[0], row[1], row[2]))
else:
for row in rows:
if row[0] in ['10023', '10024', '10025']:
sys.stdout.write("%-30s %-20s\n" % (row[0] + ' **', row[1]))
else:
sys.stdout.write("%-30s %-20s\n" % (row[0], row[1]))
return
def print_rows_4_cols(rows, title, col1, col2, col3, col4):
"""Prints a table with 4 columns and returns None.
Arguments:
rows -- the list of rows returned from the SQL query
title -- the title of the printed table
col1, col2, col3, col4 -- the names of the columns in the table
"""
print ("\n" + title + "\n" + "-"*len(title) + "\n")
# Prefix the size requirement with '-' to left justify, '+' to right justify
sys.stdout.write("%-17s %-18s %-56s %-19s\n" % (col1, col2, col3, col4))
sys.stdout.write("%-17s %-18s %-56s %-19s\n" % ("-"*len(col1), "-"*len(col2), "-"*len(col3), "-"*len(col4)))
current = rows[0][0]
for row in rows:
if row[0] != current:
print ( )
sys.stdout.write("%-17s %-18s %-56s %-19s\n" % (row[0], row[1], row[2], row[3]))
current = row[0]
return
def print_rows_5_cols(rows, title, col1, col2, col3, col4, col5):
"""Prints a table with 5 columns and returns None.
Arguments:
rows -- the list of rows returned from the SQL query
title -- the title of the printed table
col1, col2, col3, col4, col5 -- the names of the columns in the table
"""
print ("\n" + title + "\n" + "-"*len(title) + "\n")
# Prefix the size requirement with '-' to left justify, '+' to right justify
sys.stdout.write("%-12s %-17s %-59s %-13s %-13s\n" % (col1, col2, col3, col4, col5))
sys.stdout.write("%-12s %-17s %-59s %-13s %-13s\n" % ("-"*len(col1), "-"*len(col2), "-"*len(col3),
"-"*len(col4), "-"*len(col5)))
current = rows[0][0]
for row in rows:
if len(row[2]) > 55:
n = 58
s = [ ]
for i in range(0, len(row[2]), n):
s.append(row[2][i:i+n])
print ( )
for p in s:
sys.stdout.write("%-12s %-17s %-59s %-13s %-13s\n" % (row[0], row[1], p, row[3], row[4]))
current = row[0]
continue
if row[0] != current:
print ( )
sys.stdout.write("%-12s %-17s %-59s %-13s %-13s\n" % (row[0], row[1], row[2], row[3], row[4]))
current = row[0]
return
#------------------------#
# SQL database queries #
#------------------------#
def queries():
"""Executes SQL database queries, prints tables of results, and returns None."""
if not os.path.exists("data_wrangling_project.db"):
print ("\nDatabase does not exist...\n")
sys.exit()
try:
db = sql.connect("data_wrangling_project.db")
except:
print ("\nError -- cannot connect to the database")
sys.exit()
c = db.cursor()
title = 'Dates of data entry'
comment = ' Provides the Age of the Data \n Oldest date first, in descending order'
col1 = 'Timestamp'
col2 = 'Count'
col3 = 'Username'
query = "SELECT timestamp, COUNT(timestamp), user FROM nodes_union_ways \
GROUP BY timestamp ORDER BY timestamp ASC LIMIT 10;"
c.execute(query)
rows = c.fetchall()
if rows == []:
print ("\n" + title)
print ("...Warning: No data found!!")
else:
print_rows_3_cols(rows, title, comment, col1, col2, col3)
title = 'Zip code'
comment = ' List of the Zip Codes \n ** indicates Upper West Side'
col1 = 'Zip code'
col2 = 'Count'
col3 = ''
query = "SELECT value, COUNT(value) as Number FROM union_all_tags \
WHERE key = 'postcode' \
GROUP BY value ORDER BY Number DESC LIMIT 10;"
c.execute(query)
rows = c.fetchall()
if rows == []:
print ("\n" + title)
print ("...Warning: No data found!!")
else:
print_rows_3_cols(rows, title, comment, col1, col2, col3)
title = 'Burger Joints on the Upper West Side'
col1 = 'ID'
col2 = 'Key'
col3 = 'Value'
col4 = 'Timestamp'
query = "SELECT union_all_tags.id, key, value, timestamp FROM union_all_tags \
INNER JOIN nodes_union_ways \
ON nodes_union_ways.id = union_all_tags.id \
WHERE union_all_tags.id IN (SELECT id FROM union_all_tags \
WHERE key = 'cuisine' AND value = 'Burger' \
INTERSECT \
SELECT id FROM union_all_tags \
WHERE key = 'postcode' AND value IN ('10023', '10024', '10025') \
) \
;"
c.execute(query)
rows = c.fetchall()
if rows == []:
print ("\n" + title)
print ("...Warning: No data found!!")
else:
print_rows_4_cols(rows, title, col1, col2, col3, col4)
title = "Bookshops on the Upper West Side"
col1 = 'ID'
col2 = 'Key'
col3 = 'Value'
col4 = 'Timestamp'
query = "SELECT union_all_tags.id, key, value, timestamp FROM union_all_tags \
INNER JOIN nodes_union_ways \
ON nodes_union_ways.id = union_all_tags.id \
WHERE union_all_tags.id IN (SELECT id FROM union_all_tags \
WHERE key = 'shop' AND value = 'books' \
INTERSECT \
SELECT id FROM union_all_tags \
WHERE key = 'postcode' AND value IN ('10023', '10024', '10025') \
) \
;"
c.execute(query)
rows = c.fetchall()
if rows == []:
print ("\n" + title)
print ("...Warning: No data found!!")
else:
print_rows_4_cols(rows, title, col1, col2, col3, col4)
title = 'RadioShack'
col1 = 'ID'
col2 = 'Key'
col3 = 'Value'
col4 = 'Timestamp'
query = "SELECT union_all_tags.id, key, value, timestamp FROM union_all_tags \
INNER JOIN nodes_union_ways \
ON nodes_union_ways.id = union_all_tags.id \
WHERE union_all_tags.id IN (SELECT id FROM union_all_tags \
WHERE key = 'name' AND value IN ('RadioShack', 'Radio Shack', 'Radioshack') \
INTERSECT \
SELECT id FROM union_all_tags \
WHERE key = 'postcode' AND value IN ('10023', '10024', '10025') \
) \
;"
c.execute(query)
rows = c.fetchall()
if rows == []:
print ("\n" + title)
print ("...Warning: No data found!!")
else:
print_rows_4_cols(rows, title, col1, col2, col3, col4)
title = 'Inscription'
col1 = 'ID'
col2 = 'Key'
col3 = 'Value'
col4 = 'Latitude'
col5 = 'Longitude'
query = "SELECT union_all_tags.id, key, value, \
CASE WHEN lat IS NOT NULL \
THEN lat \
ELSE ' ' \
END, \
CASE WHEN lon IS NOT NULL \
THEN lon \
ELSE ' ' \
END \
FROM union_all_tags \
INNER JOIN nodes_union_ways \
ON nodes_union_ways.id = union_all_tags.id \
WHERE union_all_tags.id IN (SELECT id FROM union_all_tags \
WHERE key IN ('inscription_1', 'inscription_2', 'inscription_date', \
'nrhp:inscription_date') \
) \
;"
c.execute(query)
rows = c.fetchall()
if rows:
print_rows_5_cols(rows, title, col1, col2, col3, col4, col5)
else:
print ('\nQuery: No data retrieved from database!\n')
c.close()
db.close()
return
queries()
I found a statue in Central Park that was unbeknown to me, with its inscription -- to be revealed ahead.
Assess the data quality by coverage, accuracy, and age.
This plot answers the question: "How old is the data?"
In mapping applications, the age of the data is important because places change constantly. A map with old data is not useful in a rapidly changing metropolis such as New York.
Refer to the function plot_dates() in the module below.
Python code:
database_age_plot.py
# Filename: database_age_plot.py
# Python 3.7
import sqlite3 as sql
import matplotlib.pyplot as plt
from datetime import datetime
from operator import itemgetter
import sys
import os
def print_rows_2Columns(title, rows):
"""Prints a table with 2 columns and returns None.
Arguments:
title -- the title of the printed table
rows -- the list of rows returned from the SQL query
"""
print ('')
print (title.ljust(24), "\t Count")
print (("-------").ljust(24), "\t -----\n")
for row in rows:
print (str(row[0]).ljust(24), "\t", row[1])
return
#======================#
# Main routine #
#======================#
# Plot the distribution of the age of the data
def plot_dates():
"""Plots the distribution of the age of the data, and returns None."""
if not os.path.exists("data_wrangling_project.db"):
print ("\nDatabase does not exist...\n")
sys.exit()
try:
db = sql.connect("data_wrangling_project.db")
except:
print ("\nError -- cannot connect to the database")
sys.exit()
c = db.cursor()
title = 'Timestamp'
query = "SELECT timestamp, count(timestamp) FROM nodes_union_ways GROUP BY timestamp ORDER BY timestamp;"
c.execute(query)
rows = c.fetchall()
# print_rows_2Columns(title, rows)
# print ('\n---------------------------------------')
c.close()
db.close()
x = [ ]
y =[ ]
ticks = [ ]
tocks = [ ]
date_format = "%Y-%m-%dT%H:%M:%SZ"
for t in range(2007,2020,1):
ticks.append(datetime.strptime(str(t), "%Y"))
for t in range(0, 110, 10):
tocks.append(t)
for row in rows:
try:
datetime_object = datetime.strptime(row[0], date_format)
except:
print ('String to date time conversion error!!')
x.append(datetime_object)
y.append(row[1])
print ()
plt.figure(figsize=(11,7), clear = True)
ax = plt.subplot(111)
ax.bar(x, y, width = 200, color = (179/255.0, 204/255.0, 1.0)) # RGB color [0, 1.0] float divide by 255
ax.xaxis_date()
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
plt.xticks(ticks)
plt.yticks(tocks)
plt.xlabel('Date')
plt.ylabel('Date Counts')
plt.title('Distribution of Date Range\n')
plt.show()
sort_list = sorted(rows, key=itemgetter(0))
length = len(sort_list)
median_index = (length - 1) // 2 # integer division (quotient without remainder)
print (' '*5 + 'Number of dates: {:,}'.format(length) )
print (' '*5 + 'Date range: ')
oldest = sort_list[0][0] # minimum
newest = sort_list[length - 1][0] # maximum
print (' '*5 + oldest[:10] + ' to ' + newest[:10])
print (' '*5 + 'Median date:', sort_list[median_index][0][:10])
rows.clear()
return
if __name__ == "__main__":
plot_dates()
Central Park, New York
Database advisory: When retrieving data, use the timestamp field in the Nodes and Ways tables to determine the date when the data was entered. In large cities, and especially in New York, stores and restaurants are constantly changing. Information older than one year should be double-checked on the web.
Data wrangling is an arduous, time-consuming endeavor. The approach taken here is to programmatically clean the data before loading it into the database. Then, queries are run against the database to analyze the data.